Add total_indexing_buffer/_in_bytes to nodes info API

This commit is contained in:
Mike McCandless 2016-06-16 04:39:34 -04:00
parent 01004c72ba
commit 3f221bf7cb
7 changed files with 93 additions and 9 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.ingest.core.IngestInfo;
import org.elasticsearch.monitor.jvm.JvmInfo;
@ -78,12 +79,16 @@ public class NodeInfo extends BaseNodeResponse {
@Nullable
private IngestInfo ingest;
@Nullable
private ByteSizeValue totalIndexingBuffer;
public NodeInfo() {
}
public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Map<String, String> serviceAttributes, @Nullable Settings settings,
@Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool,
@Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins, @Nullable IngestInfo ingest) {
@Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins, @Nullable IngestInfo ingest,
ByteSizeValue totalIndexingBuffer) {
super(node);
this.version = version;
this.build = build;
@ -97,6 +102,7 @@ public class NodeInfo extends BaseNodeResponse {
this.http = http;
this.plugins = plugins;
this.ingest = ingest;
this.totalIndexingBuffer = totalIndexingBuffer;
}
/**
@ -186,6 +192,11 @@ public class NodeInfo extends BaseNodeResponse {
return ingest;
}
@Nullable
public ByteSizeValue getTotalIndexingBuffer() {
return totalIndexingBuffer;
}
public static NodeInfo readNodeInfo(StreamInput in) throws IOException {
NodeInfo nodeInfo = new NodeInfo();
nodeInfo.readFrom(in);
@ -197,6 +208,7 @@ public class NodeInfo extends BaseNodeResponse {
super.readFrom(in);
version = Version.readVersion(in);
build = Build.readBuild(in);
totalIndexingBuffer = new ByteSizeValue(in.readLong());
if (in.readBoolean()) {
Map<String, String> builder = new HashMap<>();
int size = in.readVInt();
@ -240,6 +252,11 @@ public class NodeInfo extends BaseNodeResponse {
super.writeTo(out);
out.writeVInt(version.id);
Build.writeBuild(build, out);
if (totalIndexingBuffer == null) {
out.writeLong(0);
} else {
out.writeLong(totalIndexingBuffer.bytes());
}
if (getServiceAttributes() == null) {
out.writeBoolean(false);
} else {

View File

@ -69,6 +69,8 @@ public class NodesInfoResponse extends BaseNodesResponse<NodeInfo> implements To
builder.field("version", nodeInfo.getVersion());
builder.field("build_hash", nodeInfo.getBuild().shortHash());
builder.field("total_indexing_buffer_in_bytes", nodeInfo.getTotalIndexingBuffer().bytes());
builder.field("total_indexing_buffer", nodeInfo.getTotalIndexingBuffer());
if (nodeInfo.getServiceAttributes() != null) {
for (Map.Entry<String, String> nodeAttribute : nodeInfo.getServiceAttributes().entrySet()) {

View File

@ -27,11 +27,11 @@ import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.fieldstats.FieldStats;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -53,9 +53,10 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -1140,6 +1141,10 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService>
}
}
public ByteSizeValue getTotalIndexingBufferBytes() {
return indexingMemoryController.indexingBufferSize();
}
/**
* Cache something calculated at the shard level.
* @param shard the shard this item is part of

View File

@ -135,7 +135,8 @@ public class NodeService extends AbstractComponent implements Closeable {
transportService.info(),
httpServer == null ? null : httpServer.info(),
pluginService == null ? null : pluginService.info(),
ingestService == null ? null : ingestService.info()
ingestService == null ? null : ingestService.info(),
indicesService == null ? null : indicesService.getTotalIndexingBufferBytes()
);
}
@ -150,7 +151,8 @@ public class NodeService extends AbstractComponent implements Closeable {
transport ? transportService.info() : null,
http ? (httpServer == null ? null : httpServer.info()) : null,
plugin ? (pluginService == null ? null : pluginService.info()) : null,
ingest ? (ingestService == null ? null : ingestService.info()) : null
ingest ? (ingestService == null ? null : ingestService.info()) : null,
indicesService == null ? null : indicesService.getTotalIndexingBufferBytes()
);
}

View File

@ -141,6 +141,6 @@ public class NodeInfoStreamingTests extends ESTestCase {
plugins.addModule(DummyPluginInfo.INSTANCE);
plugins.addPlugin(DummyPluginInfo.INSTANCE);
IngestInfo ingestInfo = new IngestInfo(Collections.emptyList());
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, serviceAttributes, settings, osInfo, process, jvm, threadPoolInfo, transport, htttpInfo, plugins, ingestInfo);
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, serviceAttributes, settings, osInfo, process, jvm, threadPoolInfo, transport, htttpInfo, plugins, ingestInfo, null);
}
}

View File

@ -24,14 +24,15 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.List;
import static org.elasticsearch.client.Requests.nodesInfoRequest;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ -80,6 +81,29 @@ public class SimpleNodesInfoIT extends ESIntegTestCase {
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
}
public void testNodesInfosTotalIndexingBuffer() throws Exception {
List<String> nodesIds = internalCluster().startNodesAsync(2).get();
final String node_1 = nodesIds.get(0);
final String node_2 = nodesIds.get(1);
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());
String server1NodeId = internalCluster().getInstance(ClusterService.class, node_1).state().nodes().getLocalNodeId();
String server2NodeId = internalCluster().getInstance(ClusterService.class, node_2).state().nodes().getLocalNodeId();
logger.info("--> started nodes: {} and {}", server1NodeId, server2NodeId);
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().execute().actionGet();
assertThat(response.getNodes().size(), is(2));
assertThat(response.getNodesMap().get(server1NodeId), notNullValue());
assertNotNull(response.getNodesMap().get(server1NodeId).getTotalIndexingBuffer());
assertThat(response.getNodesMap().get(server1NodeId).getTotalIndexingBuffer().bytes(), greaterThan(0L));
assertThat(response.getNodesMap().get(server2NodeId), notNullValue());
assertNotNull(response.getNodesMap().get(server2NodeId).getTotalIndexingBuffer());
assertThat(response.getNodesMap().get(server2NodeId).getTotalIndexingBuffer().bytes(), greaterThan(0L));
}
public void testAllocatedProcessors() throws Exception {
List<String> nodesIds = internalCluster().
startNodesAsync(

View File

@ -15,7 +15,41 @@ The second command selectively retrieves nodes information of only
`nodeId1` and `nodeId2`. All the nodes selective options are explained
<<cluster-nodes,here>>.
By default, it just returns all attributes and core settings for a node.
By default, it just returns all attributes and core settings for a node:
[float]
[[core-info]]
`build_hash`::
Short hash of the last git commit in this release.
`host`::
The node's host name.
`http_address`::
Host and port where primary HTTP connections are accepted.
`ip`::
The node's IP address.
`name`::
The node's name.
`total_indexing_buffer`::
Total heap allowed to be used to hold recently indexed
documents before they must be written to disk. This size is
a shared pool across all shards on this node, and is
controlled by <<indexing-buffer,Indexing Buffer settings>>.
`total_indexing_buffer_in_bytes`::
Same as `total_indexing_buffer`, but expressed in bytes.
`transport_address`::
Host and port where transport HTTP connections are accepted.
`version`::
Elasticsearch version running on this node.
It also allows to get only information on `settings`, `os`, `process`, `jvm`,
`thread_pool`, `transport`, `http`, `plugins` and `ingest`: