Merge pull request #18914 from mikemccand/node_info_indexing_buffer

Add total_indexing_buffer/_in_bytes to nodes info API
This commit is contained in:
Michael McCandless 2016-06-22 17:13:35 -04:00 committed by GitHub
commit 14025aaef8
12 changed files with 153 additions and 14 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.monitor.jvm.JvmInfo;
@ -78,12 +79,16 @@ public class NodeInfo extends BaseNodeResponse {
@Nullable
private IngestInfo ingest;
@Nullable
private ByteSizeValue totalIndexingBuffer;
public NodeInfo() {
}
public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Map<String, String> serviceAttributes, @Nullable Settings settings,
@Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool,
@Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins, @Nullable IngestInfo ingest) {
@Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins, @Nullable IngestInfo ingest,
@Nullable ByteSizeValue totalIndexingBuffer) {
super(node);
this.version = version;
this.build = build;
@ -97,6 +102,7 @@ public class NodeInfo extends BaseNodeResponse {
this.http = http;
this.plugins = plugins;
this.ingest = ingest;
this.totalIndexingBuffer = totalIndexingBuffer;
}
/**
@ -186,6 +192,11 @@ public class NodeInfo extends BaseNodeResponse {
return ingest;
}
@Nullable
public ByteSizeValue getTotalIndexingBuffer() {
return totalIndexingBuffer;
}
public static NodeInfo readNodeInfo(StreamInput in) throws IOException {
NodeInfo nodeInfo = new NodeInfo();
nodeInfo.readFrom(in);
@ -197,6 +208,11 @@ public class NodeInfo extends BaseNodeResponse {
super.readFrom(in);
version = Version.readVersion(in);
build = Build.readBuild(in);
if (in.readBoolean()) {
totalIndexingBuffer = new ByteSizeValue(in.readLong());
} else {
totalIndexingBuffer = null;
}
if (in.readBoolean()) {
Map<String, String> builder = new HashMap<>();
int size = in.readVInt();
@ -240,6 +256,12 @@ public class NodeInfo extends BaseNodeResponse {
super.writeTo(out);
out.writeVInt(version.id);
Build.writeBuild(build, out);
if (totalIndexingBuffer == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeLong(totalIndexingBuffer.bytes());
}
if (getServiceAttributes() == null) {
out.writeBoolean(false);
} else {

View File

@ -39,6 +39,7 @@ public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {
private boolean http = true;
private boolean plugins = true;
private boolean ingest = true;
private boolean indices = true;
public NodesInfoRequest() {
}
@ -64,6 +65,7 @@ public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {
http = false;
plugins = false;
ingest = false;
indices = false;
return this;
}
@ -80,6 +82,7 @@ public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {
http = true;
plugins = true;
ingest = true;
indices = true;
return this;
}
@ -221,6 +224,22 @@ public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {
return ingest;
}
/**
* Should information about indices (currently just indexing buffers) be returned
* @param indices true if you want info
*/
public NodesInfoRequest indices(boolean indices) {
this.indices = indices;
return this;
}
/**
* @return true if information about indices (currently just indexing buffers)
*/
public boolean indices() {
return indices;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -233,6 +252,7 @@ public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {
http = in.readBoolean();
plugins = in.readBoolean();
ingest = in.readBoolean();
indices = in.readBoolean();
}
@Override
@ -247,5 +267,6 @@ public class NodesInfoRequest extends BaseNodesRequest<NodesInfoRequest> {
out.writeBoolean(http);
out.writeBoolean(plugins);
out.writeBoolean(ingest);
out.writeBoolean(indices);
}
}

View File

@ -118,4 +118,12 @@ public class NodesInfoRequestBuilder extends NodesOperationRequestBuilder<NodesI
request().ingest(ingest);
return this;
}
/**
* Should the node indices info be returned.
*/
public NodesInfoRequestBuilder setIndices(boolean indices) {
request().indices(indices);
return this;
}
}

View File

@ -69,6 +69,9 @@ public class NodesInfoResponse extends BaseNodesResponse<NodeInfo> implements To
builder.field("version", nodeInfo.getVersion());
builder.field("build_hash", nodeInfo.getBuild().shortHash());
if (nodeInfo.getTotalIndexingBuffer() != null) {
builder.byteSizeField("total_indexing_buffer", "total_indexing_buffer_in_bytes", nodeInfo.getTotalIndexingBuffer());
}
if (nodeInfo.getServiceAttributes() != null) {
for (Map.Entry<String, String> nodeAttribute : nodeInfo.getServiceAttributes().entrySet()) {

View File

@ -76,7 +76,7 @@ public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequ
protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) {
NodesInfoRequest request = nodeRequest.request;
return nodeService.info(request.settings(), request.os(), request.process(), request.jvm(), request.threadPool(),
request.transport(), request.http(), request.plugins(), request.ingest());
request.transport(), request.http(), request.plugins(), request.ingest(), request.indices());
}
@Override

View File

@ -91,7 +91,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
@Override
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, true, false, true, false);
NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, true, false, true, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, true, false, false, false, false, false, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (IndexService indexService : indicesService) {

View File

@ -27,11 +27,11 @@ import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -53,9 +53,10 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -1140,6 +1141,10 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService>
}
}
public ByteSizeValue getTotalIndexingBufferBytes() {
return indexingMemoryController.indexingBufferSize();
}
/**
* Cache something calculated at the shard level.
* @param shard the shard this item is part of

View File

@ -132,12 +132,13 @@ public class NodeService extends AbstractComponent implements Closeable {
transportService.info(),
httpServer == null ? null : httpServer.info(),
pluginService == null ? null : pluginService.info(),
ingestService == null ? null : ingestService.info()
ingestService == null ? null : ingestService.info(),
indicesService.getTotalIndexingBufferBytes()
);
}
public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool,
boolean transport, boolean http, boolean plugin, boolean ingest) {
boolean transport, boolean http, boolean plugin, boolean ingest, boolean indices) {
return new NodeInfo(Version.CURRENT, Build.CURRENT, discovery.localNode(), serviceAttributes,
settings ? settingsFilter.filter(this.settings) : null,
os ? monitorService.osService().info() : null,
@ -147,7 +148,8 @@ public class NodeService extends AbstractComponent implements Closeable {
transport ? transportService.info() : null,
http ? (httpServer == null ? null : httpServer.info()) : null,
plugin ? (pluginService == null ? null : pluginService.info()) : null,
ingest ? (ingestService == null ? null : ingestService.info()) : null
ingest ? (ingestService == null ? null : ingestService.info()) : null,
indices ? indicesService.getTotalIndexingBufferBytes() : null
);
}

View File

@ -43,7 +43,7 @@ import static org.elasticsearch.rest.RestRequest.Method.GET;
public class RestNodesInfoAction extends BaseRestHandler {
private final SettingsFilter settingsFilter;
private final static Set<String> ALLOWED_METRICS = Sets.newHashSet("http", "jvm", "os", "plugins", "process", "settings", "thread_pool", "transport", "ingest");
private final static Set<String> ALLOWED_METRICS = Sets.newHashSet("http", "jvm", "os", "plugins", "process", "settings", "thread_pool", "transport", "ingest", "indices");
@Inject
public RestNodesInfoAction(Settings settings, RestController controller, Client client, SettingsFilter settingsFilter) {
@ -97,6 +97,7 @@ public class RestNodesInfoAction extends BaseRestHandler {
nodesInfoRequest.http(metrics.contains("http"));
nodesInfoRequest.plugins(metrics.contains("plugins"));
nodesInfoRequest.ingest(metrics.contains("ingest"));
nodesInfoRequest.indices(metrics.contains("indices"));
}
settingsFilter.addFilterSettingParams(request);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpInfo;
@ -141,6 +142,13 @@ public class NodeInfoStreamingTests extends ESTestCase {
plugins.addModule(DummyPluginInfo.INSTANCE);
plugins.addPlugin(DummyPluginInfo.INSTANCE);
IngestInfo ingestInfo = new IngestInfo(Collections.emptyList());
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, serviceAttributes, settings, osInfo, process, jvm, threadPoolInfo, transport, htttpInfo, plugins, ingestInfo);
ByteSizeValue indexingBuffer;
if (random().nextBoolean()) {
indexingBuffer = null;
} else {
// pick a random long that sometimes exceeds an int:
indexingBuffer = new ByteSizeValue(random().nextLong() & ((1L<<40)-1));
}
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, serviceAttributes, settings, osInfo, process, jvm, threadPoolInfo, transport, htttpInfo, plugins, ingestInfo, indexingBuffer);
}
}

View File

@ -24,14 +24,15 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.List;
import static org.elasticsearch.client.Requests.nodesInfoRequest;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -80,6 +81,40 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
}
public void testNodesInfosTotalIndexingBuffer() throws Exception {
List<String> nodesIds = internalCluster().startNodesAsync(2).get();
final String node_1 = nodesIds.get(0);
final String node_2 = nodesIds.get(1);
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());
String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().getLocalNodeId();
String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().getLocalNodeId();
logger.info("--> started nodes: {} and {}", server1NodeId, server2NodeId);
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet();
assertThat(response.getNodes().size(), is(2));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
assertNotNull(response.getNodesMap().get(server1NodeId).getTotalIndexingBuffer());
assertThat(response.getNodesMap().get(server1NodeId).getTotalIndexingBuffer().bytes(), greaterThan(0L));
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
assertNotNull(response.getNodesMap().get(server2NodeId).getTotalIndexingBuffer());
assertThat(response.getNodesMap().get(server2NodeId).getTotalIndexingBuffer().bytes(), greaterThan(0L));
// again, using only the indices flag
response = client().admin().cluster().prepareNodesInfo().clear().setIndices(true).execute().actionGet();
assertThat(response.getNodes().size(), is(2));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
assertNotNull(response.getNodesMap().get(server1NodeId).getTotalIndexingBuffer());
assertThat(response.getNodesMap().get(server1NodeId).getTotalIndexingBuffer().bytes(), greaterThan(0L));
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
assertNotNull(response.getNodesMap().get(server2NodeId).getTotalIndexingBuffer());
assertThat(response.getNodesMap().get(server2NodeId).getTotalIndexingBuffer().bytes(), greaterThan(0L));
}
public void testAllocatedProcessors() throws Exception {
List<String> nodesIds = internalCluster().
startNodesAsync(

View File

@ -15,9 +15,43 @@ The second command selectively retrieves nodes information of only
`nodeId1` and `nodeId2`. All the nodes selective options are explained
<<cluster-nodes,here>>.
By default, it just returns all attributes and core settings for a node.
By default, it just returns all attributes and core settings for a node:
[float]
[[core-info]]
`build_hash`::
Short hash of the last git commit in this release.
`host`::
The node's host name.
`http_address`::
Host and port where primary HTTP connections are accepted.
`ip`::
The node's IP address.
`name`::
The node's name.
`total_indexing_buffer`::
Total heap allowed to be used to hold recently indexed
documents before they must be written to disk. This size is
a shared pool across all shards on this node, and is
controlled by <<indexing-buffer,Indexing Buffer settings>>.
`total_indexing_buffer_in_bytes`::
Same as `total_indexing_buffer`, but expressed in bytes.
`transport_address`::
Host and port where transport HTTP connections are accepted.
`version`::
Elasticsearch version running on this node.
It also allows to get only information on `settings`, `os`, `process`, `jvm`,
`thread_pool`, `transport`, `http`, `plugins` and `ingest`:
`thread_pool`, `transport`, `http`, `plugins`, `ingest` and `indices`:
[source,js]
--------------------------------------------------