Merge pull request #14409 from xuzha/allocated_processors

Add os.allocated_processors stats

closes #13917
This commit is contained in:
Xu Zhang 2015-11-03 09:55:44 -08:00
commit fe75c91de6
6 changed files with 78 additions and 3 deletions

View File

@ -301,6 +301,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
public static class OsStats implements ToXContent, Streamable {
int availableProcessors;
int allocatedProcessors;
long availableMemory;
final ObjectIntHashMap<String> names;
@ -310,6 +311,8 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
public void addNodeInfo(NodeInfo nodeInfo) {
availableProcessors += nodeInfo.getOs().getAvailableProcessors();
allocatedProcessors += nodeInfo.getOs().getAllocatedProcessors();
if (nodeInfo.getOs().getName() != null) {
names.addTo(nodeInfo.getOs().getName(), 1);
}
@ -319,6 +322,10 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
return availableProcessors;
}
public int getAllocatedProcessors() {
return allocatedProcessors;
}
public ByteSizeValue getAvailableMemory() {
return new ByteSizeValue(availableMemory);
}
@ -326,6 +333,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
availableProcessors = in.readVInt();
allocatedProcessors = in.readVInt();
availableMemory = in.readLong();
int size = in.readVInt();
names.clear();
@ -337,6 +345,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(availableProcessors);
out.writeVInt(allocatedProcessors);
out.writeLong(availableMemory);
out.writeVInt(names.size());
for (ObjectIntCursor<String> name : names) {
@ -353,6 +362,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
static final class Fields {
static final XContentBuilderString AVAILABLE_PROCESSORS = new XContentBuilderString("available_processors");
static final XContentBuilderString ALLOCATED_PROCESSORS = new XContentBuilderString("allocated_processors");
static final XContentBuilderString NAME = new XContentBuilderString("name");
static final XContentBuilderString NAMES = new XContentBuilderString("names");
static final XContentBuilderString MEM = new XContentBuilderString("mem");
@ -364,6 +374,7 @@ public class ClusterStatsNodes implements ToXContent, Streamable {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.AVAILABLE_PROCESSORS, availableProcessors);
builder.field(Fields.ALLOCATED_PROCESSORS, allocatedProcessors);
builder.startObject(Fields.MEM);
builder.byteSizeField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, availableMemory);
builder.endObject();

View File

@ -34,6 +34,8 @@ public class OsInfo implements Streamable, ToXContent {
int availableProcessors;
int allocatedProcessors;
String name = null;
String arch = null;
String version = null;
@ -49,6 +51,10 @@ public class OsInfo implements Streamable, ToXContent {
return this.availableProcessors;
}
public int getAllocatedProcessors() {
return this.allocatedProcessors;
}
public String getName() {
return name;
}
@ -69,6 +75,7 @@ public class OsInfo implements Streamable, ToXContent {
static final XContentBuilderString REFRESH_INTERVAL = new XContentBuilderString("refresh_interval");
static final XContentBuilderString REFRESH_INTERVAL_IN_MILLIS = new XContentBuilderString("refresh_interval_in_millis");
static final XContentBuilderString AVAILABLE_PROCESSORS = new XContentBuilderString("available_processors");
static final XContentBuilderString ALLOCATED_PROCESSORS = new XContentBuilderString("allocated_processors");
}
@Override
@ -85,6 +92,7 @@ public class OsInfo implements Streamable, ToXContent {
builder.field(Fields.VERSION, version);
}
builder.field(Fields.AVAILABLE_PROCESSORS, availableProcessors);
builder.field(Fields.ALLOCATED_PROCESSORS, allocatedProcessors);
builder.endObject();
return builder;
}
@ -99,11 +107,13 @@ public class OsInfo implements Streamable, ToXContent {
public void readFrom(StreamInput in) throws IOException {
refreshInterval = in.readLong();
availableProcessors = in.readInt();
allocatedProcessors = in.readInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(refreshInterval);
out.writeInt(availableProcessors);
out.writeInt(allocatedProcessors);
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;
import org.elasticsearch.common.util.concurrent.EsExecutors;
/**
*
@ -45,6 +46,8 @@ public class OsService extends AbstractComponent {
this.info = probe.osInfo();
this.info.refreshInterval = refreshInterval.millis();
this.info.allocatedProcessors = EsExecutors.boundedNumberOfProcessors(settings);
osStatsCache = new OsStatsCache(refreshInterval, probe.osStats());
logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval);
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@ -35,6 +36,7 @@ import java.io.IOException;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0)
@ -86,7 +88,6 @@ public class ClusterStatsIT extends ESIntegTestCase {
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
assertThat(response.getStatus(), Matchers.equalTo(ClusterHealthStatus.GREEN));
prepareCreate("test1").setSettings("number_of_shards", 2, "number_of_replicas", 1).get();
ensureYellow();
response = client().admin().cluster().prepareClusterStats().get();
@ -157,4 +158,16 @@ public class ClusterStatsIT extends ESIntegTestCase {
assertThat(msg, response.nodesStats.getProcess().getMaxOpenFileDescriptors(), Matchers.greaterThanOrEqualTo(-1L));
}
public void testAllocatedProcessors() throws Exception {
// stop all other nodes
internalCluster().ensureAtMostNumDataNodes(0);
// start one node with 7 processors.
internalCluster().startNodesAsync(Settings.builder().put(EsExecutors.PROCESSORS, 7).build()).get();
waitForNodes(1);
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
assertThat(response.getNodesStats().getOs().getAllocatedProcessors(), equalTo(7));
}
}

View File

@ -22,6 +22,8 @@ package org.elasticsearch.nodesinfo;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
@ -29,8 +31,7 @@ import org.elasticsearch.test.ESIntegTestCase.Scope;
import java.util.List;
import static org.elasticsearch.client.Requests.nodesInfoRequest;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.*;
/**
*
@ -81,4 +82,36 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
assertThat(response.getNodes().length, is(1));
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
}
public void testAllocatedProcessors() throws Exception {
List<String> nodesIds = internalCluster().
startNodesAsync(
Settings.builder().put(EsExecutors.PROCESSORS, 3).build(),
Settings.builder().put(EsExecutors.PROCESSORS, 6).build()
).get();
final String node_1 = nodesIds.get(0);
final String node_2 = nodesIds.get(1);
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get();
logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().localNodeId();
String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().localNodeId();
logger.info("--> started nodes: " + server1NodeId + " and " + server2NodeId);
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet();
assertThat(response.getNodes().length, is(2));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
assertThat(response.getNodesMap().get(server1NodeId).getOs().getAvailableProcessors(),
equalTo(Runtime.getRuntime().availableProcessors()));
assertThat(response.getNodesMap().get(server2NodeId).getOs().getAvailableProcessors(),
equalTo(Runtime.getRuntime().availableProcessors()));
assertThat(response.getNodesMap().get(server1NodeId).getOs().getAllocatedProcessors(), equalTo(3));
assertThat(response.getNodesMap().get(server2NodeId).getOs().getAllocatedProcessors(), equalTo(6));
}
}

View File

@ -54,6 +54,11 @@ the operating system:
`os.available_processors`::
Number of processors available to the Java virtual machine
`os.allocated_processors`::
The number of processors actually used to calculate thread pool size. This number can be set
with the `processors` setting of a node and defaults to the number of processors reported by the OS.
In both cases this number will never be larger than 32.
[float]
[[process-info]]
==== Process information