diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index 0e77e3d6e3a..fe8d4121c75 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -301,6 +301,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable { public static class OsStats implements ToXContent, Streamable { int availableProcessors; + int allocatedProcessors; long availableMemory; final ObjectIntHashMap names; @@ -310,6 +311,8 @@ public class ClusterStatsNodes implements ToXContent, Streamable { public void addNodeInfo(NodeInfo nodeInfo) { availableProcessors += nodeInfo.getOs().getAvailableProcessors(); + allocatedProcessors += nodeInfo.getOs().getAllocatedProcessors(); + if (nodeInfo.getOs().getName() != null) { names.addTo(nodeInfo.getOs().getName(), 1); } @@ -319,6 +322,10 @@ public class ClusterStatsNodes implements ToXContent, Streamable { return availableProcessors; } + public int getAllocatedProcessors() { + return allocatedProcessors; + } + public ByteSizeValue getAvailableMemory() { return new ByteSizeValue(availableMemory); } @@ -326,6 +333,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable { @Override public void readFrom(StreamInput in) throws IOException { availableProcessors = in.readVInt(); + allocatedProcessors = in.readVInt(); availableMemory = in.readLong(); int size = in.readVInt(); names.clear(); @@ -337,6 +345,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(availableProcessors); + out.writeVInt(allocatedProcessors); out.writeLong(availableMemory); out.writeVInt(names.size()); for (ObjectIntCursor name : names) { @@ -353,6 +362,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable { static final class Fields { static final XContentBuilderString AVAILABLE_PROCESSORS = new XContentBuilderString("available_processors"); + static final XContentBuilderString ALLOCATED_PROCESSORS = new XContentBuilderString("allocated_processors"); static final XContentBuilderString NAME = new XContentBuilderString("name"); static final XContentBuilderString NAMES = new XContentBuilderString("names"); static final XContentBuilderString MEM = new XContentBuilderString("mem"); @@ -364,6 +374,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(Fields.AVAILABLE_PROCESSORS, availableProcessors); + builder.field(Fields.ALLOCATED_PROCESSORS, allocatedProcessors); builder.startObject(Fields.MEM); builder.byteSizeField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, availableMemory); builder.endObject(); diff --git a/core/src/main/java/org/elasticsearch/monitor/os/OsInfo.java b/core/src/main/java/org/elasticsearch/monitor/os/OsInfo.java index bfa12af427d..f34cd51a143 100644 --- a/core/src/main/java/org/elasticsearch/monitor/os/OsInfo.java +++ b/core/src/main/java/org/elasticsearch/monitor/os/OsInfo.java @@ -34,6 +34,8 @@ public class OsInfo implements Streamable, ToXContent { int availableProcessors; + int allocatedProcessors; + String name = null; String arch = null; String version = null; @@ -49,6 +51,10 @@ public class OsInfo implements Streamable, ToXContent { return this.availableProcessors; } + public int getAllocatedProcessors() { + return this.allocatedProcessors; + } + public String getName() { return name; } @@ -69,6 +75,7 @@ public class OsInfo implements Streamable, ToXContent { static final XContentBuilderString REFRESH_INTERVAL = new XContentBuilderString("refresh_interval"); static final XContentBuilderString REFRESH_INTERVAL_IN_MILLIS = new XContentBuilderString("refresh_interval_in_millis"); static final XContentBuilderString AVAILABLE_PROCESSORS = new XContentBuilderString("available_processors"); + static final XContentBuilderString ALLOCATED_PROCESSORS = new XContentBuilderString("allocated_processors"); } @Override @@ -85,6 +92,7 @@ public class OsInfo implements Streamable, ToXContent { builder.field(Fields.VERSION, version); } builder.field(Fields.AVAILABLE_PROCESSORS, availableProcessors); + builder.field(Fields.ALLOCATED_PROCESSORS, allocatedProcessors); builder.endObject(); return builder; } @@ -99,11 +107,13 @@ public class OsInfo implements Streamable, ToXContent { public void readFrom(StreamInput in) throws IOException { refreshInterval = in.readLong(); availableProcessors = in.readInt(); + allocatedProcessors = in.readInt(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeLong(refreshInterval); out.writeInt(availableProcessors); + out.writeInt(allocatedProcessors); } } diff --git a/core/src/main/java/org/elasticsearch/monitor/os/OsService.java b/core/src/main/java/org/elasticsearch/monitor/os/OsService.java index 6c54b877dc2..0d4e896f5c3 100644 --- a/core/src/main/java/org/elasticsearch/monitor/os/OsService.java +++ b/core/src/main/java/org/elasticsearch/monitor/os/OsService.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.SingleObjectCache; +import org.elasticsearch.common.util.concurrent.EsExecutors; /** * @@ -45,6 +46,8 @@ public class OsService extends AbstractComponent { this.info = probe.osInfo(); this.info.refreshInterval = refreshInterval.millis(); + this.info.allocatedProcessors = EsExecutors.boundedNumberOfProcessors(settings); + osStatsCache = new OsStatsCache(refreshInterval, probe.osStats()); logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval); } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java index e23ce270352..2c3357f3ceb 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.store.Store; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; @@ -35,6 +36,7 @@ import java.io.IOException; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0) @@ -86,7 +88,6 @@ public class ClusterStatsIT extends ESIntegTestCase { ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN)); - prepareCreate("test1").setSettings("number_of_shards", 2, "number_of_replicas", 1).get(); ensureYellow(); response = client().admin().cluster().prepareClusterStats().get(); @@ -157,4 +158,16 @@ public class ClusterStatsIT extends ESIntegTestCase { assertThat(msg, response.nodesStats.getProcess().getMaxOpenFileDescriptors(), Matchers.greaterThanOrEqualTo(-1L)); } + + public void testAllocatedProcessors() throws Exception { + // stop all other nodes + internalCluster().ensureAtMostNumDataNodes(0); + + // start one node with 7 processors. + internalCluster().startNodesAsync(Settings.builder().put(EsExecutors.PROCESSORS, 7).build()).get(); + waitForNodes(1); + + ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); + assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7)); + } } diff --git a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java index 5ae598ff2d2..6b23bb09f24 100644 --- a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java +++ b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java @@ -22,6 +22,8 @@ package org.elasticsearch.nodesinfo; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; @@ -29,8 +31,7 @@ import org.elasticsearch.test.ESIntegTestCase.Scope; import java.util.List; import static org.elasticsearch.client.Requests.nodesInfoRequest; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.*; /** * @@ -81,4 +82,36 @@ public class SimpleNodesInfoIT extends ESIntegTestCase { assertThat(response.getNodes().length, is(1)); assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); } + + public void testAllocatedProcessors() throws Exception { + List nodesIds = internalCluster(). + startNodesAsync( + Settings.builder().put(EsExecutors.PROCESSORS, 3).build(), + Settings.builder().put(EsExecutors.PROCESSORS, 6).build() + ).get(); + + final String node_1 = nodesIds.get(0); + final String node_2 = nodesIds.get(1); + + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get(); + logger.info("--> done cluster_health, status " + clusterHealth.getStatus()); + + String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().localNodeId(); + String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().localNodeId(); + logger.info("--> started nodes: " + server1NodeId + " and " + server2NodeId); + + NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet(); + + assertThat(response.getNodes().length, is(2)); + assertThat(response.getNodesMap().get(server1NodeId), notNullValue()); + assertThat(response.getNodesMap().get(server2NodeId), notNullValue()); + + assertThat(response.getNodesMap().get(server1NodeId).getOs().getAvailableProcessors(), + equalTo(Runtime.getRuntime().availableProcessors())); + assertThat(response.getNodesMap().get(server2NodeId).getOs().getAvailableProcessors(), + equalTo(Runtime.getRuntime().availableProcessors())); + + assertThat(response.getNodesMap().get(server1NodeId).getOs().getAllocatedProcessors(), equalTo(3)); + assertThat(response.getNodesMap().get(server2NodeId).getOs().getAllocatedProcessors(), equalTo(6)); + } } diff --git a/docs/reference/cluster/nodes-info.asciidoc b/docs/reference/cluster/nodes-info.asciidoc index 215e8449f37..a3072768ca6 100644 --- a/docs/reference/cluster/nodes-info.asciidoc +++ b/docs/reference/cluster/nodes-info.asciidoc @@ -54,6 +54,11 @@ the operating system: `os.available_processors`:: Number of processors available to the Java virtual machine +`os.allocated_processors`:: + The number of processors actually used to calculate thread pool size. This number can be set + with the `processors` setting of a node and defaults to the number of processors reported by the OS. + In both cases this number will never be larger than 32. + [float] [[process-info]] ==== Process information