From cc3364e4f8332371fc093b3f815992d13d6d5001 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 25 Oct 2017 07:35:25 +0100 Subject: [PATCH] Stats to record how often the ClusterState diff mechanism is used successfully (#26973) It's believed that using diffs obsoletes the other mechanism for reusing the bits of the ClusterState that didn't change between updates, but in fact we don't know for sure how often the diff mechanism works successfully. The stats collected here will tell us. --- .../discovery/DiscoveryStats.java | 26 +++++- .../discovery/single/SingleNodeDiscovery.java | 4 +- .../zen/PublishClusterStateAction.java | 17 ++++ .../zen/PublishClusterStateStats.java | 90 +++++++++++++++++++ .../discovery/zen/ZenDiscovery.java | 3 +- .../cluster/node/stats/NodeStatsTests.java | 15 +++- .../zen/PublishClusterStateActionTests.java | 67 ++++++++++++++ .../discovery/zen/ZenDiscoveryIT.java | 11 ++- .../test/nodes.stats/30_discovery.yml | 13 +++ 9 files changed, 236 insertions(+), 10 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateStats.java diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java index 37e916d65fb..7dab4235cde 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -26,33 +27,48 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.zen.PendingClusterStateStats; +import org.elasticsearch.discovery.zen.PublishClusterStateStats; import java.io.IOException; public class DiscoveryStats implements Writeable, ToXContentFragment { - @Nullable private final PendingClusterStateStats queueStats; + private final PublishClusterStateStats publishStats; - public DiscoveryStats(PendingClusterStateStats queueStats) { + public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats) { this.queueStats = queueStats; + this.publishStats = publishStats; } public DiscoveryStats(StreamInput in) throws IOException { queueStats = in.readOptionalWriteable(PendingClusterStateStats::new); + + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + publishStats = in.readOptionalWriteable(PublishClusterStateStats::new); + } else { + publishStats = null; + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(queueStats); + + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeOptionalWriteable(publishStats); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.DISCOVERY); - if (queueStats != null ){ + if (queueStats != null) { queueStats.toXContent(builder, params); } + if (publishStats != null) { + publishStats.toXContent(builder, params); + } builder.endObject(); return builder; } @@ -64,4 +80,8 @@ public class DiscoveryStats implements Writeable, ToXContentFragment { public PendingClusterStateStats getQueueStats() { return queueStats; } + + public PublishClusterStateStats getPublishStats() { + return publishStats; + } } diff --git a/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java index 2f3124010cc..2a32caabc77 100644 --- a/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java @@ -21,7 +21,6 @@ package org.elasticsearch.discovery.single; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -34,6 +33,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.zen.PendingClusterStateStats; +import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -94,7 +94,7 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D @Override public DiscoveryStats stats() { - return new DiscoveryStats((PendingClusterStateStats) null); + return new DiscoveryStats(null, null); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java index ae469d162ae..95de654928e 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java @@ -65,6 +65,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class PublishClusterStateAction extends AbstractComponent { @@ -90,6 +91,10 @@ public class PublishClusterStateAction extends AbstractComponent { private final IncomingClusterStateListener incomingClusterStateListener; private final DiscoverySettings discoverySettings; + private final AtomicLong fullClusterStateReceivedCount = new AtomicLong(); + private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong(); + private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong(); + public PublishClusterStateAction( Settings settings, TransportService transportService, @@ -380,11 +385,13 @@ public class PublishClusterStateAction extends AbstractComponent { // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { incomingState = ClusterState.readFrom(in, transportService.getLocalNode()); + fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); } else if (lastSeenClusterState != null) { Diff diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode()); incomingState = diff.apply(lastSeenClusterState); + compatibleClusterStateDiffReceivedCount.incrementAndGet(); logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length()); } else { @@ -394,6 +401,9 @@ public class PublishClusterStateAction extends AbstractComponent { incomingClusterStateListener.onIncomingClusterState(incomingState); lastSeenClusterState = incomingState; } + } catch (IncompatibleClusterStateVersionException e) { + incompatibleClusterStateDiffReceivedCount.incrementAndGet(); + throw e; } finally { IOUtils.close(in); } @@ -636,4 +646,11 @@ public class PublishClusterStateAction extends AbstractComponent { publishingTimedOut.set(isTimedOut); } } + + public PublishClusterStateStats stats() { + return new PublishClusterStateStats( + fullClusterStateReceivedCount.get(), + incompatibleClusterStateDiffReceivedCount.get(), + compatibleClusterStateDiffReceivedCount.get()); + } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateStats.java b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateStats.java new file mode 100644 index 00000000000..8a848198759 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateStats.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Class encapsulating stats about the PublishClusterStateAction + */ +public class PublishClusterStateStats implements Writeable, ToXContentObject { + + private final long fullClusterStateReceivedCount; + private final long incompatibleClusterStateDiffReceivedCount; + private final long compatibleClusterStateDiffReceivedCount; + + /** + * @param fullClusterStateReceivedCount the number of times this node has received a full copy of the cluster state from the master. + * @param incompatibleClusterStateDiffReceivedCount the number of times this node has received a cluster-state diff from the master. + * @param compatibleClusterStateDiffReceivedCount the number of times that received cluster-state diffs were compatible with + */ + public PublishClusterStateStats(long fullClusterStateReceivedCount, + long incompatibleClusterStateDiffReceivedCount, + long compatibleClusterStateDiffReceivedCount) { + this.fullClusterStateReceivedCount = fullClusterStateReceivedCount; + this.incompatibleClusterStateDiffReceivedCount = incompatibleClusterStateDiffReceivedCount; + this.compatibleClusterStateDiffReceivedCount = compatibleClusterStateDiffReceivedCount; + } + + public PublishClusterStateStats(StreamInput in) throws IOException { + fullClusterStateReceivedCount = in.readVLong(); + incompatibleClusterStateDiffReceivedCount = in.readVLong(); + compatibleClusterStateDiffReceivedCount = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(fullClusterStateReceivedCount); + out.writeVLong(incompatibleClusterStateDiffReceivedCount); + out.writeVLong(compatibleClusterStateDiffReceivedCount); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("published_cluster_states"); + { + builder.field("full_states", fullClusterStateReceivedCount); + builder.field("incompatible_diffs", incompatibleClusterStateDiffReceivedCount); + builder.field("compatible_diffs", compatibleClusterStateDiffReceivedCount); + } + builder.endObject(); + return builder; + } + + long getFullClusterStateReceivedCount() { return fullClusterStateReceivedCount; } + + long getIncompatibleClusterStateDiffReceivedCount() { return incompatibleClusterStateDiffReceivedCount; } + + long getCompatibleClusterStateDiffReceivedCount() { return compatibleClusterStateDiffReceivedCount; } + + @Override + public String toString() { + return "PublishClusterStateStats(full=" + fullClusterStateReceivedCount + + ", incompatible=" + incompatibleClusterStateDiffReceivedCount + + ", compatible=" + compatibleClusterStateDiffReceivedCount + + ")"; + } +} diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index ba5d5213e17..d688a5d5cdb 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -412,8 +412,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover @Override public DiscoveryStats stats() { - PendingClusterStateStats queueStats = pendingStatesQueue.stats(); - return new DiscoveryStats(queueStats); + return new DiscoveryStats(pendingStatesQueue.stats(), publishClusterState.stats()); } public DiscoverySettings getDiscoverySettings() { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 9d01b74213e..d9aed454732 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.discovery.zen.PendingClusterStateStats; +import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.http.HttpStats; import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; import org.elasticsearch.indices.breaker.CircuitBreakerStats; @@ -411,8 +412,18 @@ public class NodeStatsTests extends ESTestCase { } ScriptStats scriptStats = frequently() ? new ScriptStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) : null; - DiscoveryStats discoveryStats = frequently() ? new DiscoveryStats(randomBoolean() ? new PendingClusterStateStats(randomInt(), - randomInt(), randomInt()) : null) : null; + DiscoveryStats discoveryStats = frequently() + ? new DiscoveryStats( + randomBoolean() + ? new PendingClusterStateStats(randomInt(), randomInt(), randomInt()) + : null, + randomBoolean() + ? new PublishClusterStateStats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong()) + : null) + : null; IngestStats ingestStats = null; if (frequently()) { IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index bc73d5b3bc3..9693a1baadc 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -705,6 +705,73 @@ public class PublishClusterStateActionTests extends ESTestCase { } } + private void assertPublishClusterStateStats(String description, MockNode node, long expectedFull, long expectedIncompatibleDiffs, + long expectedCompatibleDiffs) { + PublishClusterStateStats stats = node.action.stats(); + assertThat(description + ": full cluster states", stats.getFullClusterStateReceivedCount(), equalTo(expectedFull)); + assertThat(description + ": incompatible cluster state diffs", stats.getIncompatibleClusterStateDiffReceivedCount(), + equalTo(expectedIncompatibleDiffs)); + assertThat(description + ": compatible cluster state diffs", stats.getCompatibleClusterStateDiffReceivedCount(), + equalTo(expectedCompatibleDiffs)); + } + + public void testPublishClusterStateStats() throws Exception { + MockNode nodeA = createMockNode("nodeA").setAsMaster(); + MockNode nodeB = createMockNode("nodeB"); + + assertPublishClusterStateStats("nodeA: initial state", nodeA, 0, 0, 0); + assertPublishClusterStateStats("nodeB: initial state", nodeB, 0, 0, 0); + + // Initial cluster state + ClusterState clusterState = nodeA.clusterState; + + // cluster state update - add nodeB + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(clusterState.nodes()).add(nodeB.discoveryNode).build(); + ClusterState previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build(); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); + + // Sent as a full cluster state update + assertPublishClusterStateStats("nodeA: after full update", nodeA, 0, 0, 0); + assertPublishClusterStateStats("nodeB: after full update", nodeB, 1, 0, 0); + + // Increment cluster state version + previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).incrementVersion().build(); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); + + // Sent, successfully, as a cluster state diff + assertPublishClusterStateStats("nodeA: after successful diff update", nodeA, 0, 0, 0); + assertPublishClusterStateStats("nodeB: after successful diff update", nodeB, 1, 0, 1); + + // Increment cluster state version twice + previousClusterState = ClusterState.builder(clusterState).incrementVersion().build(); + clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); + publishStateAndWait(nodeA.action, clusterState, previousClusterState); + + // Sent, unsuccessfully, as a diff and then retried as a full update + assertPublishClusterStateStats("nodeA: after unsuccessful diff update", nodeA, 0, 0, 0); + assertPublishClusterStateStats("nodeB: after unsuccessful diff update", nodeB, 2, 1, 1); + + // node A steps down from being master + nodeA.resetMasterId(); + nodeB.resetMasterId(); + + // node B becomes the master and sends a version of the cluster state that goes back + discoveryNodes = DiscoveryNodes.builder(discoveryNodes) + .add(nodeA.discoveryNode) + .add(nodeB.discoveryNode) + .masterNodeId(nodeB.discoveryNode.getId()) + .localNodeId(nodeB.discoveryNode.getId()) + .build(); + previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build(); + clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build(); + publishStateAndWait(nodeB.action, clusterState, previousClusterState); + + // Sent, unsuccessfully, as a diff, and then retried as a full update + assertPublishClusterStateStats("nodeA: B became master", nodeA, 1, 1, 0); + assertPublishClusterStateStats("nodeB: B became master", nodeB, 2, 1, 1); + } private MetaData buildMetaDataForVersion(MetaData metaData, long version) { ImmutableOpenMap.Builder indices = ImmutableOpenMap.builder(metaData.indices()); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index 7821d4fd944..ed13f34b609 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -47,7 +47,6 @@ import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; -import org.hamcrest.Matchers; import java.io.IOException; import java.net.UnknownHostException; @@ -255,6 +254,11 @@ public class ZenDiscoveryIT extends ESIntegTestCase { " \"total\" : 0,\n" + " \"pending\" : 0,\n" + " \"committed\" : 0\n" + + " },\n" + + " \"published_cluster_states\" : {\n" + + " \"full_states\" : 0,\n" + + " \"incompatible_diffs\" : 0,\n" + + " \"compatible_diffs\" : 0\n" + " }\n" + " }\n" + "}"; @@ -275,6 +279,11 @@ public class ZenDiscoveryIT extends ESIntegTestCase { assertThat(stats.getQueueStats().getCommitted(), equalTo(0)); assertThat(stats.getQueueStats().getPending(), equalTo(0)); + assertThat(stats.getPublishStats(), notNullValue()); + assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), equalTo(0L)); + assertThat(stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount(), equalTo(0L)); + assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), equalTo(0L)); + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); builder.startObject(); stats.toXContent(builder, ToXContent.EMPTY_PARAMS); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/30_discovery.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/30_discovery.yml index 2617f76941c..629c8de164d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/30_discovery.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/30_discovery.yml @@ -1,5 +1,8 @@ --- "Discovery stats": + - skip: + version: " - 6.99.99" + reason: "published_cluster_states_received is not (yet) in 6.x" - do: cluster.state: {} @@ -15,6 +18,11 @@ - is_true: nodes.$master.name - is_false: nodes.$master.jvm - is_true: nodes.$master.discovery + - is_true: nodes.$master.discovery.cluster_state_queue + - is_true: nodes.$master.discovery.published_cluster_states + - gte: { nodes.$master.discovery.published_cluster_states.full_states: 0 } + - gte: { nodes.$master.discovery.published_cluster_states.incompatible_diffs: 0 } + - gte: { nodes.$master.discovery.published_cluster_states.compatible_diffs: 0 } - is_true: nodes.$master.roles - do: @@ -26,4 +34,9 @@ - is_false: nodes.$master.name - is_false: nodes.$master.jvm - is_true: nodes.$master.discovery + - is_true: nodes.$master.discovery.cluster_state_queue + - is_true: nodes.$master.discovery.published_cluster_states + - gte: { nodes.$master.discovery.published_cluster_states.full_states: 0 } + - gte: { nodes.$master.discovery.published_cluster_states.incompatible_diffs: 0 } + - gte: { nodes.$master.discovery.published_cluster_states.compatible_diffs: 0 } - is_false: nodes.$master.roles