diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java index ebcaab8495e..f3fb443673d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -24,21 +25,29 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage { public static final String DETECTORS = "detectors"; public static final String FORECASTS = "forecasts"; public static final String MODEL_SIZE = "model_size"; + public static final String NODE_COUNT = "node_count"; private final Map jobsUsage; private final Map datafeedsUsage; + private final int nodeCount; public MachineLearningFeatureSetUsage(boolean available, boolean enabled, Map jobsUsage, - Map datafeedsUsage) { + Map datafeedsUsage, int nodeCount) { super(XPackField.MACHINE_LEARNING, available, enabled); this.jobsUsage = Objects.requireNonNull(jobsUsage); this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage); + this.nodeCount = nodeCount; } public MachineLearningFeatureSetUsage(StreamInput in) throws IOException { super(in); this.jobsUsage = in.readMap(); this.datafeedsUsage = in.readMap(); + if (in.getVersion().onOrAfter(Version.V_6_5_0)) { + this.nodeCount = in.readInt(); + } else { + this.nodeCount = -1; + } } @Override @@ -46,6 +55,9 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage { super.writeTo(out); out.writeMap(jobsUsage); out.writeMap(datafeedsUsage); + if (out.getVersion().onOrAfter(Version.V_6_5_0)) { + out.writeInt(nodeCount); + } } @Override @@ -57,6 +69,9 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage { if (datafeedsUsage != null) { builder.field(DATAFEEDS_FIELD, datafeedsUsage); } + if (nodeCount >= 0) { + builder.field(NODE_COUNT, nodeCount); + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index 14ac43fae10..b5ff2e2a7de 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; @@ -132,7 +133,22 @@ public class MachineLearningFeatureSet implements XPackFeatureSet { @Override public void usage(ActionListener listener) { ClusterState state = clusterService.state(); - new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled()).execute(listener); + new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled(), mlNodeCount(state)).execute(listener); + } + + private int mlNodeCount(final ClusterState clusterState) { + if (enabled == false) { + return 0; + } + + int mlNodeCount = 0; + for (DiscoveryNode node : clusterState.getNodes()) { + String enabled = node.getAttributes().get(MachineLearning.ML_ENABLED_NODE_ATTR); + if (Boolean.parseBoolean(enabled)) { + ++mlNodeCount; + } + } + return mlNodeCount; } public static class Retriever { @@ -143,19 +159,22 @@ public class MachineLearningFeatureSet implements XPackFeatureSet { private final boolean enabled; private Map jobsUsage; private Map datafeedsUsage; + private int nodeCount; - public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolean enabled) { + public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolean enabled, int nodeCount) { this.client = Objects.requireNonNull(client); this.mlMetadata = mlMetadata; this.available = available; this.enabled = enabled; this.jobsUsage = new LinkedHashMap<>(); this.datafeedsUsage = new LinkedHashMap<>(); + this.nodeCount = nodeCount; } public void execute(ActionListener listener) { if (enabled == false) { - listener.onResponse(new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap())); + listener.onResponse( + new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap(), 0)); return; } @@ -164,11 +183,9 @@ public class MachineLearningFeatureSet implements XPackFeatureSet { ActionListener.wrap(response -> { addDatafeedsUsage(response); listener.onResponse(new MachineLearningFeatureSetUsage( - available, enabled, jobsUsage, datafeedsUsage)); + available, enabled, jobsUsage, datafeedsUsage, nodeCount)); }, - error -> { - listener.onFailure(error); - } + listener::onFailure ); // Step 1. Extract usage from jobs stats and then request stats for all datafeeds @@ -181,9 +198,7 @@ public class MachineLearningFeatureSet implements XPackFeatureSet { client.execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest, datafeedStatsListener); }, - error -> { - listener.onFailure(error); - } + listener::onFailure ); // Step 0. Kick off the chain of callbacks by requesting jobs stats diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index 5278171d438..ff6a5451268 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -6,14 +6,19 @@ package org.elasticsearch.xpack.ml; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -46,7 +51,11 @@ import org.junit.Before; import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -223,6 +232,49 @@ public class MachineLearningFeatureSetTests extends ESTestCase { } } + public void testNodeCount() throws Exception { + when(licenseState.isMachineLearningAllowed()).thenReturn(true); + int nodeCount = randomIntBetween(1, 3); + givenNodeCount(nodeCount); + Settings.Builder settings = Settings.builder().put(commonSettings); + settings.put("xpack.ml.enabled", true); + MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(settings.build()), + clusterService, client, licenseState); + + PlainActionFuture future = new PlainActionFuture<>(); + featureSet.usage(future); + XPackFeatureSet.Usage usage = future.get(); + + assertThat(usage.available(), is(true)); + assertThat(usage.enabled(), is(true)); + + BytesStreamOutput out = new BytesStreamOutput(); + usage.writeTo(out); + XPackFeatureSet.Usage serializedUsage = new MachineLearningFeatureSetUsage(out.bytes().streamInput()); + + XContentSource source; + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + serializedUsage.toXContent(builder, ToXContent.EMPTY_PARAMS); + source = new XContentSource(builder); + } + assertThat(source.getValue("node_count"), equalTo(nodeCount)); + + BytesStreamOutput oldOut = new BytesStreamOutput(); + oldOut.setVersion(Version.V_6_0_0); + usage.writeTo(oldOut); + StreamInput oldInput = oldOut.bytes().streamInput(); + oldInput.setVersion(Version.V_6_0_0); + XPackFeatureSet.Usage oldSerializedUsage = new MachineLearningFeatureSetUsage(oldInput); + + XContentSource oldSource; + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + oldSerializedUsage.toXContent(builder, ToXContent.EMPTY_PARAMS); + oldSource = new XContentSource(builder); + } + + assertNull(oldSource.getValue("node_count")); + } + public void testUsageGivenMlMetadataNotInstalled() throws Exception { when(licenseState.isMachineLearningAllowed()).thenReturn(true); Settings.Builder settings = Settings.builder().put(commonSettings); @@ -286,6 +338,37 @@ public class MachineLearningFeatureSetTests extends ESTestCase { }).when(client).execute(same(GetJobsStatsAction.INSTANCE), any(), any()); } + private void givenNodeCount(int nodeCount) { + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < nodeCount; i++) { + Map attrs = new HashMap<>(); + attrs.put(MachineLearning.ML_ENABLED_NODE_ATTR, Boolean.toString(true)); + Set roles = new HashSet<>(); + roles.add(DiscoveryNode.Role.DATA); + roles.add(DiscoveryNode.Role.MASTER); + roles.add(DiscoveryNode.Role.INGEST); + nodesBuilder.add(new DiscoveryNode(randomAlphaOfLength(i+1), + new TransportAddress(TransportAddress.META_ADDRESS, 9100 + i), + attrs, + roles, + Version.CURRENT)); + } + for (int i = 0; i < randomIntBetween(1, 3); i++) { + Map attrs = new HashMap<>(); + Set roles = new HashSet<>(); + roles.add(DiscoveryNode.Role.DATA); + roles.add(DiscoveryNode.Role.MASTER); + roles.add(DiscoveryNode.Role.INGEST); + nodesBuilder.add(new DiscoveryNode(randomAlphaOfLength(i+1), + new TransportAddress(TransportAddress.META_ADDRESS, 9300 + i), + attrs, + roles, + Version.CURRENT)); + } + ClusterState clusterState = new ClusterState.Builder(ClusterState.EMPTY_STATE).nodes(nodesBuilder.build()).build(); + when(clusterService.state()).thenReturn(clusterState); + } + private void givenDatafeeds(List datafeedStats) { doAnswer(invocationOnMock -> { ActionListener listener =