diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java index 37e916d65fb..7dab4235cde 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -26,33 +27,48 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.zen.PendingClusterStateStats; +import org.elasticsearch.discovery.zen.PublishClusterStateStats; import java.io.IOException; public class DiscoveryStats implements Writeable, ToXContentFragment { - @Nullable private final PendingClusterStateStats queueStats; + private final PublishClusterStateStats publishStats; - public DiscoveryStats(PendingClusterStateStats queueStats) { + public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats) { this.queueStats = queueStats; + this.publishStats = publishStats; } public DiscoveryStats(StreamInput in) throws IOException { queueStats = in.readOptionalWriteable(PendingClusterStateStats::new); + + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + publishStats = in.readOptionalWriteable(PublishClusterStateStats::new); + } else { + publishStats = null; + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(queueStats); + + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeOptionalWriteable(publishStats); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.DISCOVERY); - if (queueStats != null ){ + if (queueStats != null) { queueStats.toXContent(builder, params); } + if (publishStats != null) { + publishStats.toXContent(builder, params); + } builder.endObject(); return builder; } @@ -64,4 +80,8 @@ public class DiscoveryStats implements Writeable, ToXContentFragment { public PendingClusterStateStats getQueueStats() { return queueStats; } + + public PublishClusterStateStats getPublishStats() { + return publishStats; + } } diff --git a/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java index 2f3124010cc..2a32caabc77 100644 --- a/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java @@ -21,7 +21,6 @@ package org.elasticsearch.discovery.single; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -34,6 +33,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.zen.PendingClusterStateStats; +import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -94,7 +94,7 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D @Override public DiscoveryStats stats() { - return new DiscoveryStats((PendingClusterStateStats) null); + return new DiscoveryStats(null, null); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java index ae469d162ae..95de654928e 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java @@ -65,6 +65,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class PublishClusterStateAction extends AbstractComponent { @@ -90,6 +91,10 @@ public class PublishClusterStateAction extends AbstractComponent { private final IncomingClusterStateListener incomingClusterStateListener; private final DiscoverySettings discoverySettings; + private final AtomicLong fullClusterStateReceivedCount = new AtomicLong(); + private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong(); + private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong(); + public PublishClusterStateAction( Settings settings, TransportService transportService, @@ -380,11 +385,13 @@ public class PublishClusterStateAction extends AbstractComponent { // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { incomingState = ClusterState.readFrom(in, transportService.getLocalNode()); + fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); } else if (lastSeenClusterState != null) { Diff diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode()); incomingState = diff.apply(lastSeenClusterState); + compatibleClusterStateDiffReceivedCount.incrementAndGet(); logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length()); } else { @@ -394,6 +401,9 @@ public class PublishClusterStateAction extends AbstractComponent { incomingClusterStateListener.onIncomingClusterState(incomingState); lastSeenClusterState = incomingState; } + } catch (IncompatibleClusterStateVersionException e) { + incompatibleClusterStateDiffReceivedCount.incrementAndGet(); + throw e; } finally { IOUtils.close(in); } @@ -636,4 +646,11 @@ public class PublishClusterStateAction extends AbstractComponent { publishingTimedOut.set(isTimedOut); } } + + public PublishClusterStateStats stats() { + return new PublishClusterStateStats( + fullClusterStateReceivedCount.get(), + incompatibleClusterStateDiffReceivedCount.get(), + compatibleClusterStateDiffReceivedCount.get()); + } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateStats.java b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateStats.java new file mode 100644 index 00000000000..8a848198759 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateStats.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Class encapsulating stats about the PublishClusterStateAction + */ +public class PublishClusterStateStats implements Writeable, ToXContentObject { + + private final long fullClusterStateReceivedCount; + private final long incompatibleClusterStateDiffReceivedCount; + private final long compatibleClusterStateDiffReceivedCount; + + /** + * @param fullClusterStateReceivedCount the number of times this node has received a full copy of the cluster state from the master. + * @param incompatibleClusterStateDiffReceivedCount the number of times this node has received a cluster-state diff from the master. + * @param compatibleClusterStateDiffReceivedCount the number of times that received cluster-state diffs were compatible with + */ + public PublishClusterStateStats(long fullClusterStateReceivedCount, + long incompatibleClusterStateDiffReceivedCount, + long compatibleClusterStateDiffReceivedCount) { + this.fullClusterStateReceivedCount = fullClusterStateReceivedCount; + this.incompatibleClusterStateDiffReceivedCount = incompatibleClusterStateDiffReceivedCount; + this.compatibleClusterStateDiffReceivedCount = compatibleClusterStateDiffReceivedCount; + } + + public PublishClusterStateStats(StreamInput in) throws IOException { + fullClusterStateReceivedCount = in.readVLong(); + incompatibleClusterStateDiffReceivedCount = in.readVLong(); + compatibleClusterStateDiffReceivedCount = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(fullClusterStateReceivedCount); + out.writeVLong(incompatibleClusterStateDiffReceivedCount); + out.writeVLong(compatibleClusterStateDiffReceivedCount); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("published_cluster_states"); + { + builder.field("full_states", fullClusterStateReceivedCount); + builder.field("incompatible_diffs", incompatibleClusterStateDiffReceivedCount); + builder.field("compatible_diffs", compatibleClusterStateDiffReceivedCount); + } + builder.endObject(); + return builder; + } + + long getFullClusterStateReceivedCount() { return fullClusterStateReceivedCount; } + + long getIncompatibleClusterStateDiffReceivedCount() { return incompatibleClusterStateDiffReceivedCount; } + + long getCompatibleClusterStateDiffReceivedCount() { return compatibleClusterStateDiffReceivedCount; } + + @Override + public String toString() { + return "PublishClusterStateStats(full=" + fullClusterStateReceivedCount + + ", incompatible=" + incompatibleClusterStateDiffReceivedCount + + ", compatible=" + compatibleClusterStateDiffReceivedCount + + ")"; + } +} diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index ba5d5213e17..d688a5d5cdb 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -412,8 +412,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover @Override public DiscoveryStats stats() { - PendingClusterStateStats queueStats = pendingStatesQueue.stats(); - return new DiscoveryStats(queueStats); + return new DiscoveryStats(pendingStatesQueue.stats(), publishClusterState.stats()); } public DiscoverySettings getDiscoverySettings() { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 9d01b74213e..d9aed454732 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.zen.PendingClusterStateStats; +import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.http.HttpStats; import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; import org.elasticsearch.indices.breaker.CircuitBreakerStats; @@ -411,8 +412,18 @@ public class NodeStatsTests extends ESTestCase { } ScriptStats scriptStats = frequently() ? new ScriptStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) : null; - DiscoveryStats discoveryStats = frequently() ? new DiscoveryStats(randomBoolean() ? new PendingClusterStateStats(randomInt(), - randomInt(), randomInt()) : null) : null; + DiscoveryStats discoveryStats = frequently() + ? new DiscoveryStats( + randomBoolean() + ? new PendingClusterStateStats(randomInt(), randomInt(), randomInt()) + : null, + randomBoolean() + ? new PublishClusterStateStats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong()) + : null) + : null; IngestStats ingestStats = null; if (frequently()) { IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index bc73d5b3bc3..9693a1baadc 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -705,6 +705,73 @@ public class PublishClusterStateActionTests extends ESTestCase { } } + private void assertPublishClusterStateStats(String description, MockNode node, long expectedFull, long expectedIncompatibleDiffs, + long expectedCompatibleDiffs) { + PublishClusterStateStats stats = node.action.stats(); + assertThat(description + ": full cluster states", stats.getFullClusterStateReceivedCount(), equalTo(expectedFull)); + assertThat(description + ": incompatible cluster state diffs", stats.getIncompatibleClusterStateDiffReceivedCount(), + equalTo(expectedIncompatibleDiffs)); + assertThat(description + ": compatible cluster state diffs", stats.getCompatibleClusterStateDiffReceivedCount(), + equalTo(expectedCompatibleDiffs)); + } + + public void testPublishClusterStateStats() throws Exception { + MockNode nodeA = createMockNode("nodeA").setAsMaster(); + MockNode nodeB = createMockNode("nodeB"); + + assertPublishClusterStateStats("nodeA: initial state", nodeA, 0, 0, 0); + assertPublishClusterStateStats("nodeB: initial state", nodeB, 0, 0, 0); + + // Initial cluster state + ClusterState clusterState = nodeA.clusterState; + + // cluster state update - add nodeB + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(clusterState.nodes()).add(nodeB.discoveryNode).build(); + ClusterState previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build(); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); + + // Sent as a full cluster state update + assertPublishClusterStateStats("nodeA: after full update", nodeA, 0, 0, 0); + assertPublishClusterStateStats("nodeB: after full update", nodeB, 1, 0, 0); + + // Increment cluster state version + previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).incrementVersion().build(); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); + + // Sent, successfully, as a cluster state diff + assertPublishClusterStateStats("nodeA: after successful diff update", nodeA, 0, 0, 0); + assertPublishClusterStateStats("nodeB: after successful diff update", nodeB, 1, 0, 1); + + // Increment cluster state version twice + previousClusterState = ClusterState.builder(clusterState).incrementVersion().build(); + clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); + + // Sent, unsuccessfully, as a diff and then retried as a full update + assertPublishClusterStateStats("nodeA: after unsuccessful diff update", nodeA, 0, 0, 0); + assertPublishClusterStateStats("nodeB: after unsuccessful diff update", nodeB, 2, 1, 1); + + // node A steps down from being master + nodeA.resetMasterId(); + nodeB.resetMasterId(); + + // node B becomes the master and sends a version of the cluster state that goes back + discoveryNodes = DiscoveryNodes.builder(discoveryNodes) + .add(nodeA.discoveryNode) + .add(nodeB.discoveryNode) + .masterNodeId(nodeB.discoveryNode.getId()) + .localNodeId(nodeB.discoveryNode.getId()) + .build(); + previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build(); + clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build(); + publishStateAndWait(nodeB.action, clusterState, previousClusterState); + + // Sent, unsuccessfully, as a diff, and then retried as a full update + assertPublishClusterStateStats("nodeA: B became master", nodeA, 1, 1, 0); + assertPublishClusterStateStats("nodeB: B became master", nodeB, 2, 1, 1); + } private MetaData buildMetaDataForVersion(MetaData metaData, long version) { ImmutableOpenMap.Builder indices = ImmutableOpenMap.builder(metaData.indices()); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index 7821d4fd944..ed13f34b609 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -47,7 +47,6 @@ import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; -import org.hamcrest.Matchers; import java.io.IOException; import java.net.UnknownHostException; @@ -255,6 +254,11 @@ public class ZenDiscoveryIT extends ESIntegTestCase { " \"total\" : 0,\n" + " \"pending\" : 0,\n" + " \"committed\" : 0\n" + + " },\n" + + " \"published_cluster_states\" : {\n" + + " \"full_states\" : 0,\n" + + " \"incompatible_diffs\" : 0,\n" + + " \"compatible_diffs\" : 0\n" + " }\n" + " }\n" + "}"; @@ -275,6 +279,11 @@ public class ZenDiscoveryIT extends ESIntegTestCase { assertThat(stats.getQueueStats().getCommitted(), equalTo(0)); assertThat(stats.getQueueStats().getPending(), equalTo(0)); + assertThat(stats.getPublishStats(), notNullValue()); + assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), equalTo(0L)); + assertThat(stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount(), equalTo(0L)); + assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), equalTo(0L)); + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); builder.startObject(); stats.toXContent(builder, ToXContent.EMPTY_PARAMS); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/30_discovery.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/30_discovery.yml index 2617f76941c..629c8de164d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/30_discovery.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/30_discovery.yml @@ -1,5 +1,8 @@ --- "Discovery stats": + - skip: + version: " - 6.99.99" + reason: "published_cluster_states_received is not (yet) in 6.x" - do: cluster.state: {} @@ -15,6 +18,11 @@ - is_true: nodes.$master.name - is_false: nodes.$master.jvm - is_true: nodes.$master.discovery + - is_true: nodes.$master.discovery.cluster_state_queue + - is_true: nodes.$master.discovery.published_cluster_states + - gte: { nodes.$master.discovery.published_cluster_states.full_states: 0 } + - gte: { nodes.$master.discovery.published_cluster_states.incompatible_diffs: 0 } + - gte: { nodes.$master.discovery.published_cluster_states.compatible_diffs: 0 } - is_true: nodes.$master.roles - do: @@ -26,4 +34,9 @@ - is_false: nodes.$master.name - is_false: nodes.$master.jvm - is_true: nodes.$master.discovery + - is_true: nodes.$master.discovery.cluster_state_queue + - is_true: nodes.$master.discovery.published_cluster_states + - gte: { nodes.$master.discovery.published_cluster_states.full_states: 0 } + - gte: { nodes.$master.discovery.published_cluster_states.incompatible_diffs: 0 } + - gte: { nodes.$master.discovery.published_cluster_states.compatible_diffs: 0 } - is_false: nodes.$master.roles