Merge pull request #14040 from xuzha/pending_queue_size_stats

Expose pending cluster state queue size in node stats

closes #13610
This commit is contained in:
Xu Zhang 2015-10-28 11:03:57 -07:00
commit a6bbf73065
22 changed files with 362 additions and 13 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.NodeIndicesStats;
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
@ -78,6 +79,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
@Nullable
private ScriptStats scriptStats;
@Nullable
private DiscoveryStats discoveryStats;
NodeStats() {
}
@ -85,7 +89,8 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
@Nullable OsStats os, @Nullable ProcessStats process, @Nullable JvmStats jvm, @Nullable ThreadPoolStats threadPool,
@Nullable FsInfo fs, @Nullable TransportStats transport, @Nullable HttpStats http,
@Nullable AllCircuitBreakerStats breaker,
@Nullable ScriptStats scriptStats) {
@Nullable ScriptStats scriptStats,
@Nullable DiscoveryStats discoveryStats) {
super(node);
this.timestamp = timestamp;
this.indices = indices;
@ -98,6 +103,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
this.http = http;
this.breaker = breaker;
this.scriptStats = scriptStats;
this.discoveryStats = discoveryStats;
}
public long getTimestamp() {
@ -177,6 +183,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
return this.scriptStats;
}
@Nullable
public DiscoveryStats getDiscoveryStats() {
return this.discoveryStats;
}
public static NodeStats readNodeStats(StreamInput in) throws IOException {
NodeStats nodeInfo = new NodeStats();
nodeInfo.readFrom(in);
@ -213,6 +224,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
}
breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
scriptStats = in.readOptionalStreamable(new ScriptStats());
discoveryStats = in.readOptionalStreamable(new DiscoveryStats(null));
}
@ -270,6 +282,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
}
out.writeOptionalStreamable(breaker);
out.writeOptionalStreamable(scriptStats);
out.writeOptionalStreamable(discoveryStats);
}
@Override
@ -321,6 +334,10 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
getScriptStats().toXContent(builder, params);
}
if (getDiscoveryStats() != null) {
getDiscoveryStats().toXContent(builder, params);
}
return builder;
}
}

View File

@ -41,6 +41,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
private boolean http;
private boolean breaker;
private boolean script;
private boolean discovery;
public NodesStatsRequest() {
}
@ -67,6 +68,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
this.http = true;
this.breaker = true;
this.script = true;
this.discovery = true;
return this;
}
@ -84,6 +86,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
this.http = false;
this.breaker = false;
this.script = false;
this.discovery = false;
return this;
}
@ -234,6 +237,20 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
return this;
}
public boolean discovery() {
return this.discovery;
}
/**
* Should the node's discovery stats be returned.
*/
public NodesStatsRequest discovery(boolean discovery) {
this.discovery = discovery;
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -247,6 +264,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
http = in.readBoolean();
breaker = in.readBoolean();
script = in.readBoolean();
discovery = in.readBoolean();
}
@Override
@ -262,6 +280,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
out.writeBoolean(http);
out.writeBoolean(breaker);
out.writeBoolean(script);
out.writeBoolean(discovery);
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.admin.cluster.node.stats;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.support.nodes.NodesOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
@ -130,4 +129,12 @@ public class NodesStatsRequestBuilder extends NodesOperationRequestBuilder<Nodes
request.http(http);
return this;
}
/**
* Should the discovery stats be returned.
*/
public NodesStatsRequestBuilder setDiscovery(boolean discovery) {
request.discovery(discovery);
return this;
}
}

View File

@ -80,7 +80,7 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest request = nodeStatsRequest.request;
return nodeService.stats(request.indices(), request.os(), request.process(), request.jvm(), request.threadPool(),
request.fs(), request.transport(), request.http(), request.breaker(), request.script());
request.fs(), request.transport(), request.http(), request.breaker(), request.script(), request.discovery());
}
@Override

View File

@ -101,7 +101,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
@Override
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, true, false, true);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, true, false, false, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, true, false, false, false, false, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {

View File

@ -87,4 +87,10 @@ public interface Discovery extends LifecycleComponent<Discovery> {
super(msg, cause, args);
}
}
/**
* @return stats about the discovery
*/
DiscoveryStats stats();
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats;
import java.io.IOException;
public class DiscoveryStats implements Streamable, ToXContent {
@Nullable
private PendingClusterStateStats queueStats;
public DiscoveryStats(PendingClusterStateStats queueStats) {
this.queueStats = queueStats;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.DISCOVERY);
if (queueStats != null ){
queueStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
queueStats = new PendingClusterStateStats();
queueStats.readFrom(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (queueStats != null ) {
out.writeBoolean(true);
queueStats.writeTo(out);
}else{
out.writeBoolean(false);
}
}
static final class Fields {
static final XContentBuilderString DISCOVERY = new XContentBuilderString("discovery");
}
public PendingClusterStateStats getQueueStats() {
return queueStats;
}
}

View File

@ -316,6 +316,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
}
@Override
public DiscoveryStats stats() {
return new DiscoveryStats(null);
}
private LocalDiscovery[] members() {
ClusterGroup clusterGroup = clusterGroups.get(clusterName);
if (clusterGroup == null) {

View File

@ -43,6 +43,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.publish.PendingClusterStateStats;
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
@ -337,6 +339,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
@Override
public DiscoveryStats stats() {
PendingClusterStateStats queueStats = publishClusterState.pendingStatesQueue().stats();
return new DiscoveryStats(queueStats);
}
/**
* returns true if zen discovery is started and there is a currently a background thread active for (re)joining
* the cluster used for testing.

View File

@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.publish;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
* Class encapsulating stats about the PendingClusterStatsQueue
*/
public class PendingClusterStateStats implements Streamable, ToXContent {
private int total;
private int pending;
private int committed;
public PendingClusterStateStats() {
}
public PendingClusterStateStats(int total, int pending, int committed) {
this.total = total;
this.pending = pending;
this.committed = committed;
}
public int getCommitted() {
return committed;
}
public int getPending() {
return pending;
}
public int getTotal() {
return total;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.QUEUE);
builder.field(Fields.TOTAL, total);
builder.field(Fields.PENDING, pending);
builder.field(Fields.COMMITTED, committed);
builder.endObject();
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
total = in.readVInt();
pending = in.readVInt();
committed = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(total);
out.writeVInt(pending);
out.writeVInt(committed);
}
static final class Fields {
static final XContentBuilderString QUEUE = new XContentBuilderString("cluster_state_queue");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString PENDING = new XContentBuilderString("pending");
static final XContentBuilderString COMMITTED = new XContentBuilderString("committed");
}
@Override
public String toString() {
return "PendingClusterStateStats(total=" + total + ", pending=" + pending + ", committed=" + committed + ")";
}
}

View File

@ -283,4 +283,17 @@ public class PendingClusterStatesQueue {
}
}
public synchronized PendingClusterStateStats stats() {
// calculate committed cluster state
int committed = 0;
for (ClusterStateContext clusterStatsContext : pendingStates) {
if (clusterStatsContext.committed()) {
committed += 1;
}
}
return new PendingClusterStateStats(pendingStates.size(), pendingStates.size() - committed, committed);
}
}

View File

@ -152,13 +152,14 @@ public class NodeService extends AbstractComponent {
transportService.stats(),
httpServer == null ? null : httpServer.stats(),
circuitBreakerService.stats(),
scriptService.stats()
scriptService.stats(),
discovery.stats()
);
}
public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool,
boolean fs, boolean transport, boolean http, boolean circuitBreaker,
boolean script) {
boolean script, boolean discoveryStats) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
return new NodeStats(discovery.localNode(), System.currentTimeMillis(),
@ -171,7 +172,8 @@ public class NodeService extends AbstractComponent {
transport ? transportService.stats() : null,
http ? (httpServer == null ? null : httpServer.stats()) : null,
circuitBreaker ? circuitBreakerService.stats() : null,
script ? scriptService.stats() : null
script ? scriptService.stats() : null,
discoveryStats ? discovery.stats() : null
);
}
}

View File

@ -77,6 +77,7 @@ public class RestNodesStatsAction extends BaseRestHandler {
nodesStatsRequest.process(metrics.contains("process"));
nodesStatsRequest.breaker(metrics.contains("breaker"));
nodesStatsRequest.script(metrics.contains("script"));
nodesStatsRequest.discovery(metrics.contains("discovery"));
// check for index specific metrics
if (metrics.contains("indices")) {

View File

@ -141,11 +141,11 @@ public class DiskUsageTests extends ESTestCase {
};
NodeStats[] nodeStats = new NodeStats[] {
new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null),
null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null),
new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null),
null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null),
new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null)
null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null)
};
InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages);
DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1");

View File

@ -73,7 +73,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
null, null, null, null, null,
fsInfo,
null, null, null,
null);
null, null);
}
@Inject

View File

@ -116,6 +116,11 @@ public class DiscoveryModuleTests extends ModuleTestCase {
}
@Override
public DiscoveryStats stats() {
return null;
}
@Override
public Lifecycle.State lifecycleState() {
return null;

View File

@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
@ -34,7 +35,11 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.membership.MembershipAction;
@ -256,4 +261,37 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
assertThat("Can't join master because version 1.6.0 is lower than the minimum compatable version 2.0.0 can support", electMasterService.electMaster(Collections.singletonList(node)), nullValue());
}
public void testDiscoveryStats() throws IOException {
String expectedStatsJsonResponse = "{\n" +
" \"discovery\" : {\n" +
" \"cluster_state_queue\" : {\n" +
" \"total\" : 0,\n" +
" \"pending\" : 0,\n" +
" \"committed\" : 0\n" +
" }\n" +
" }\n" +
"}";
Settings nodeSettings = Settings.settingsBuilder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.build();
internalCluster().startNode(nodeSettings);
logger.info("--> request node discovery stats");
NodesStatsResponse statsResponse = client().admin().cluster().prepareNodesStats().clear().setDiscovery(true).get();
assertThat(statsResponse.getNodes().length, equalTo(1));
DiscoveryStats stats = statsResponse.getNodes()[0].getDiscoveryStats();
assertThat(stats.getQueueStats(), notNullValue());
assertThat(stats.getQueueStats().getTotal(), equalTo(0));
assertThat(stats.getQueueStats().getCommitted(), equalTo(0));
assertThat(stats.getQueueStats().getPending(), equalTo(0));
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
stats.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
assertThat(builder.string(), equalTo(expectedStatsJsonResponse));
}
}

View File

@ -162,6 +162,31 @@ public class PendingClusterStatesQueueTests extends ESTestCase {
}
}
public void testQueueStats() {
List<ClusterState> states = randomStates(scaledRandomIntBetween(10, 100), "master");
PendingClusterStatesQueue queue = createQueueWithStates(states);
assertThat(queue.stats().getTotal(), equalTo(states.size()));
assertThat(queue.stats().getPending(), equalTo(states.size()));
assertThat(queue.stats().getCommitted(), equalTo(0));
List<ClusterStateContext> committedContexts = randomCommitStates(queue);
assertThat(queue.stats().getTotal(), equalTo(states.size()));
assertThat(queue.stats().getPending(), equalTo(states.size() - committedContexts.size()));
assertThat(queue.stats().getCommitted(), equalTo(committedContexts.size()));
ClusterState highestCommitted = null;
for (ClusterStateContext context : committedContexts) {
if (highestCommitted == null || context.state.supersedes(highestCommitted)) {
highestCommitted = context.state;
}
}
queue.markAsProcessed(highestCommitted);
assertThat(queue.stats().getTotal(), equalTo(states.size() - committedContexts.size()));
assertThat(queue.stats().getPending(), equalTo(states.size() - committedContexts.size()));
assertThat(queue.stats().getCommitted(), equalTo(0));
}
protected List<ClusterStateContext> randomCommitStates(PendingClusterStatesQueue queue) {
List<ClusterStateContext> committedContexts = new ArrayList<>();
for (int iter = randomInt(queue.pendingStates.size() - 1); iter >= 0; iter--) {

View File

@ -1873,7 +1873,7 @@ public final class InternalTestCluster extends TestCluster {
}
NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node);
NodeStats stats = nodeService.stats(CommonStatsFlags.ALL, false, false, false, false, false, false, false, false, false);
NodeStats stats = nodeService.stats(CommonStatsFlags.ALL, false, false, false, false, false, false, false, false, false, false);
assertThat("Fielddata size must be 0 on node: " + stats.getNode(), stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0l));
assertThat("Query cache size must be 0 on node: " + stats.getNode(), stats.getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0l));
assertThat("FixedBitSet cache size must be 0 on node: " + stats.getNode(), stats.getIndices().getSegments().getBitsetMemoryInBytes(), equalTo(0l));

View File

@ -57,6 +57,9 @@ of `indices`, `os`, `process`, `jvm`, `transport`, `http`,
`breaker`::
Statistics about the field data circuit breaker
`discovery`::
Statistics about the discovery
[source,js]
--------------------------------------------------
# return indices and os

View File

@ -15,7 +15,7 @@
"parts": {
"metric" : {
"type" : "list",
"options" : ["_all", "breaker", "fs", "http", "indices", "jvm", "os", "process", "thread_pool", "transport"],
"options" : ["_all", "breaker", "fs", "http", "indices", "jvm", "os", "process", "thread_pool", "transport", "discovery"],
"description" : "Limit the information returned to the specified metrics"
},
"index_metric" : {

View File

@ -0,0 +1,25 @@
---
"Discovery stats":
- do:
cluster.state: {}
# Get master node id
- set: { master_node: master }
- do:
nodes.stats:
metric: [ discovery ]
- is_true: cluster_name
- is_true: nodes
- is_true: nodes.$master.discovery
- do:
nodes.stats:
filter_path: "nodes.*.discovery"
- is_false: cluster_name
- is_true: nodes
- is_false: nodes.$master.name
- is_false: nodes.$master.jvm
- is_true: nodes.$master.discovery