[ML] Show C++ process info in X-Pack's info response (elastic/x-pack-elasticsearch#581)

The /_xpack endpoint now has a "native_code_info" value in the feature
info for ml

Original commit: elastic/x-pack-elasticsearch@6b4b408f4a
This commit is contained in:
David Roberts 2017-02-18 20:14:52 +00:00 committed by GitHub
parent 8a6cea0350
commit 0d1181eabb
14 changed files with 214 additions and 13 deletions

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.license;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
@ -230,16 +231,20 @@ public class XPackInfoResponse extends ActionResponse {
@Nullable private final String description;
private final boolean available;
private final boolean enabled;
@Nullable private final Map<String, Object> nativeCodeInfo;
public FeatureSet(StreamInput in) throws IOException {
this(in.readString(), in.readOptionalString(), in.readBoolean(), in.readBoolean());
this(in.readString(), in.readOptionalString(), in.readBoolean(), in.readBoolean(),
in.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED) ? in.readMap() : null);
}
public FeatureSet(String name, @Nullable String description, boolean available, boolean enabled) {
public FeatureSet(String name, @Nullable String description, boolean available, boolean enabled,
@Nullable Map<String, Object> nativeCodeInfo) {
this.name = name;
this.description = description;
this.available = available;
this.enabled = enabled;
this.nativeCodeInfo = nativeCodeInfo;
}
public String name() {
@ -259,6 +264,11 @@ public class XPackInfoResponse extends ActionResponse {
return enabled;
}
@Nullable
public Map<String, Object> nativeCodeInfo() {
return nativeCodeInfo;
}
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (description != null) {
@ -266,6 +276,9 @@ public class XPackInfoResponse extends ActionResponse {
}
builder.field("available", available);
builder.field("enabled", enabled);
if (nativeCodeInfo != null) {
builder.field("native_code_info", nativeCodeInfo);
}
return builder.endObject();
}
@ -274,6 +287,9 @@ public class XPackInfoResponse extends ActionResponse {
out.writeOptionalString(description);
out.writeBoolean(available);
out.writeBoolean(enabled);
if (out.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) {
out.writeMap(nativeCodeInfo);
}
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
public interface XPackFeatureSet {
@ -23,6 +24,8 @@ public interface XPackFeatureSet {
boolean enabled();
Map<String, Object> nativeCodeInfo();
Usage usage();
abstract class Usage implements ToXContentObject, NamedWriteable {

View File

@ -59,7 +59,8 @@ public class TransportXPackInfoAction extends HandledTransportAction<XPackInfoRe
XPackInfoResponse.FeatureSetsInfo featureSetsInfo = null;
if (request.getCategories().contains(XPackInfoRequest.Category.FEATURES)) {
Set<FeatureSet> featureSets = this.featureSets.stream().map(fs ->
new FeatureSet(fs.name(), request.isVerbose() ? fs.description() : null, fs.available(), fs.enabled()))
new FeatureSet(fs.name(), request.isVerbose() ? fs.description() : null, fs.available(), fs.enabled(),
request.isVerbose() ? fs.nativeCodeInfo() : null))
.collect(Collectors.toSet());
featureSetsInfo = new XPackInfoResponse.FeatureSetsInfo(featureSets);
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.graph;
import java.io.IOException;
import java.util.Map;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
@ -47,6 +48,11 @@ public class GraphFeatureSet implements XPackFeatureSet {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public XPackFeatureSet.Usage usage() {
return new Usage(available(), enabled());

View File

@ -81,6 +81,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.job.process.ProcessCtrl;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactory;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@ -256,12 +257,17 @@ public class MachineLearning extends Plugin implements ActionPlugin {
NormalizerProcessFactory normalizerProcessFactory;
if (USE_NATIVE_PROCESS_OPTION.get(settings)) {
try {
NativeController nativeController = new NativeController(env, new NamedPipeHelper());
nativeController.tailLogsInThread();
NativeController nativeController = NativeControllerHolder.getNativeController(settings);
if (nativeController == null) {
// This will only only happen when path.home is not set, which is disallowed in production
throw new ElasticsearchException("Failed to create native process controller for Machine Learning");
}
autodetectProcessFactory = new NativeAutodetectProcessFactory(jobProvider, env, settings, nativeController, client);
normalizerProcessFactory = new NativeNormalizerProcessFactory(env, settings, nativeController);
} catch (IOException e) {
throw new ElasticsearchException("Failed to create native process factories", e);
// This also should not happen in production, as the MachineLearningFeatureSet should have
// hit the same error first and brought down the node with a friendlier error message
throw new ElasticsearchException("Failed to create native process factories for Machine Learning", e);
}
} else {
autodetectProcessFactory = (jobDetails, modelSnapshot, quantiles, filters, ignoreDowntime, executorService) ->

View File

@ -5,26 +5,47 @@
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.XPackFeatureSet;
import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.job.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class MachineLearningFeatureSet implements XPackFeatureSet {
private final boolean enabled;
private final XPackLicenseState licenseState;
private final Map<String, Object> nativeCodeInfo;
@Inject
public MachineLearningFeatureSet(Settings settings, @Nullable XPackLicenseState licenseState) {
this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings);
this.licenseState = licenseState;
Map<String, Object> nativeCodeInfo = NativeController.UNKNOWN_NATIVE_CODE_INFO;
try {
NativeController nativeController = NativeControllerHolder.getNativeController(settings);
if (nativeController != null) {
nativeCodeInfo = nativeController.getNativeCodeInfo();
}
} catch (IOException | TimeoutException e) {
Loggers.getLogger(MachineLearningFeatureSet.class).error("Cannot get native code info for Machine Learning", e);
if (enabled) {
throw new ElasticsearchException("Cannot communicate with Machine Learning native code "
+ "- please check that you are running on a supported platform");
}
}
this.nativeCodeInfo = nativeCodeInfo;
}
@Override
@ -47,6 +68,11 @@ public class MachineLearningFeatureSet implements XPackFeatureSet {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return nativeCodeInfo;
}
@Override
public XPackFeatureSet.Usage usage() {
return new Usage(available(), enabled());

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.process;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler;
@ -15,8 +16,13 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
@ -30,6 +36,15 @@ public class NativeController {
private static final String START_COMMAND = "start";
public static final Map<String, Object> UNKNOWN_NATIVE_CODE_INFO;
static {
Map<String, Object> unknownInfo = new HashMap<>(2);
unknownInfo.put("version", "N/A");
unknownInfo.put("build_hash", "N/A");
UNKNOWN_NATIVE_CODE_INFO = Collections.unmodifiableMap(unknownInfo);
}
private final CppLogMessageHandler cppLogHandler;
private final OutputStream commandStream;
private Thread logTailThread;
@ -59,6 +74,23 @@ public class NativeController {
return cppLogHandler.getPid(CONTROLLER_CONNECT_TIMEOUT);
}
public Map<String, Object> getNativeCodeInfo() throws TimeoutException {
String copyrightMessage = cppLogHandler.getCppCopyright(CONTROLLER_CONNECT_TIMEOUT);
Matcher matcher = Pattern.compile("Version (.+) \\(Build ([0-9a-f]+)\\) Copyright ").matcher(copyrightMessage);
if (matcher.find()) {
Map<String, Object> info = new HashMap<>(2);
info.put("version", matcher.group(1));
info.put("build_hash", matcher.group(2));
return info;
} else {
// If this happens it probably means someone has changed the format in lib/ver/CBuildInfo.cc
// in the machine-learning-cpp repo without changing the pattern above to match
String msg = "Unexpected native controller process copyright format: " + copyrightMessage;
LOGGER.error(msg);
throw new ElasticsearchException(msg);
}
}
public void startProcess(List<String> command) throws IOException {
// Sanity check to avoid hard-to-debug errors - tabs and newlines will confuse the controller process
for (String arg : command) {

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.process;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
import java.io.IOException;
/**
* Manages a singleton NativeController so that both the MachineLearningFeatureSet and MachineLearning classes can
* get access to the same one.
*/
public class NativeControllerHolder {
private static final Object lock = new Object();
private static NativeController nativeController;
private NativeControllerHolder() {
}
/**
* Get a reference to the singleton native process controller.
*
* The NativeController is created lazily to allow time for the C++ process to be started before connection is attempted.
*
* null is returned to tests that haven't bothered to set up path.home and all runs where useNativeProcess=false.
*
* Calls may throw an exception if initial connection to the C++ process fails.
*/
public static NativeController getNativeController(Settings settings) throws IOException {
if (Environment.PATH_HOME_SETTING.exists(settings) && MachineLearning.USE_NATIVE_PROCESS_OPTION.get(settings)) {
synchronized (lock) {
if (nativeController == null) {
nativeController = new NativeController(new Environment(settings), new NamedPipeHelper());
nativeController.tailLogsInThread();
}
}
return nativeController;
}
return null;
}
}

View File

@ -47,6 +47,7 @@ public class CppLogMessageHandler implements Closeable {
private final int errorStoreSize;
private final Deque<String> errorStore;
private final CountDownLatch pidLatch;
private final CountDownLatch cppCopyrightLatch;
private volatile boolean hasLogStreamEnded;
private volatile boolean seenFatalError;
private volatile long pid;
@ -70,6 +71,7 @@ public class CppLogMessageHandler implements Closeable {
this.errorStoreSize = errorStoreSize;
errorStore = ConcurrentCollections.newDeque();
pidLatch = new CountDownLatch(1);
cppCopyrightLatch = new CountDownLatch(1);
hasLogStreamEnded = false;
}
@ -133,7 +135,23 @@ public class CppLogMessageHandler implements Closeable {
return pid;
}
public String getCppCopyright() {
/**
* Get the process ID of the C++ process whose log messages are being read. This will
* arrive in the first log message logged by the C++ process. They all log a copyright
* message immediately on startup so it should not take long to arrive, but will not be
* available instantly after the process starts.
*/
public String getCppCopyright(Duration timeout) throws TimeoutException {
if (cppCopyright == null) {
try {
cppCopyrightLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (cppCopyright == null) {
throw new TimeoutException("Timed out waiting for C++ process copyright");
}
}
return cppCopyright;
}
@ -193,6 +211,7 @@ public class CppLogMessageHandler implements Closeable {
String latestMessage = msg.getMessage();
if (cppCopyright == null && latestMessage.contains("Copyright")) {
cppCopyright = latestMessage;
cppCopyrightLatch.countDown();
}
// TODO: Is there a way to preserve the original timestamp when re-logging?
if (jobId != null) {

View File

@ -54,6 +54,11 @@ public class MonitoringFeatureSet implements XPackFeatureSet {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public XPackFeatureSet.Usage usage() {
return new Usage(available(), enabled(), exportersUsage(exporters));

View File

@ -83,6 +83,11 @@ public class SecurityFeatureSet implements XPackFeatureSet {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public XPackFeatureSet.Usage usage() {
Map<String, Object> realmsUsage = buildRealmsUsage(realms);

View File

@ -53,6 +53,11 @@ public class WatcherFeatureSet implements XPackFeatureSet {
return enabled;
}
@Override
public Map<String, Object> nativeCodeInfo() {
return null;
}
@Override
public XPackFeatureSet.Usage usage() {
return new Usage(available(), enabled(), watcherService != null ? watcherService.usageStats() : Collections.emptyMap());

View File

@ -18,6 +18,8 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.contains;
@ -25,9 +27,9 @@ import static org.mockito.Mockito.when;
public class NativeControllerTests extends ESTestCase {
public void testNativeController() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = new Environment(settings);
private Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
public void testStartProcessCommand() throws IOException {
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
ByteArrayInputStream logStream = new ByteArrayInputStream(new byte[1]);
@ -43,10 +45,34 @@ public class NativeControllerTests extends ESTestCase {
command.add("--arg2=42");
command.add("--arg3=something with spaces");
NativeController nativeController = new NativeController(env, namedPipeHelper);
NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper);
nativeController.startProcess(command);
assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n",
commandStream.toString(StandardCharsets.UTF_8.name()));
}
public void testGetNativeCodeInfo() throws IOException, TimeoutException {
String testMessage = "{\"logger\":\"controller\",\"timestamp\":1478261151445,\"level\":\"INFO\",\"pid\":10211,"
+ "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) "
+ "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n";
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
ByteArrayInputStream logStream = new ByteArrayInputStream(testMessage.getBytes(StandardCharsets.UTF_8));
when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class)))
.thenReturn(logStream);
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class)))
.thenReturn(commandStream);
NativeController nativeController = new NativeController(new Environment(settings), namedPipeHelper);
nativeController.tailLogsInThread();
Map<String, Object> nativeCodeInfo = nativeController.getNativeCodeInfo();
assertNotNull(nativeCodeInfo);
assertEquals(2, nativeCodeInfo.size());
assertEquals("6.0.0-alpha1-SNAPSHOT", nativeCodeInfo.get("version"));
assertEquals("a0d6ef8819418c", nativeCodeInfo.get("build_hash"));
}
}

View File

@ -42,9 +42,11 @@ public class CppLogMessageHandlerTests extends ESTestCase {
handler.tailStream();
assertTrue(handler.hasLogStreamEnded());
assertEquals(10211L, handler.getPid(Duration.ofMillis(1)));
// Since this is all being done in one thread and we know the stream has
// been completely consumed at this point the wait duration can be zero
assertEquals(10211L, handler.getPid(Duration.ZERO));
assertEquals("controller (64 bit): Version based on 6.0.0-alpha1 (Build b0d6ef8819418c) "
+ "Copyright (c) 2017 Elasticsearch BV", handler.getCppCopyright());
+ "Copyright (c) 2017 Elasticsearch BV", handler.getCppCopyright(Duration.ZERO));
assertEquals("Did not understand verb 'a'\n", handler.getErrors());
assertFalse(handler.seenFatalError());
}