Stats to record how often the ClusterState diff mechanism is used successfully (#26973)

It's believed that using diffs obsoletes the other mechanism for reusing the
bits of the ClusterState that didn't change between updates, but in fact we
don't know for sure how often the diff mechanism works successfully. The stats
collected here will tell us.
This commit is contained in:
David Turner 2017-10-25 07:35:25 +01:00 committed by GitHub
parent 6bc7024f26
commit cc3364e4f8
9 changed files with 236 additions and 10 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.discovery;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -26,33 +27,48 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import java.io.IOException;
public class DiscoveryStats implements Writeable, ToXContentFragment {
@Nullable
private final PendingClusterStateStats queueStats;
private final PublishClusterStateStats publishStats;
public DiscoveryStats(PendingClusterStateStats queueStats) {
public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats) {
this.queueStats = queueStats;
this.publishStats = publishStats;
}
public DiscoveryStats(StreamInput in) throws IOException {
queueStats = in.readOptionalWriteable(PendingClusterStateStats::new);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
publishStats = in.readOptionalWriteable(PublishClusterStateStats::new);
} else {
publishStats = null;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(queueStats);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalWriteable(publishStats);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.DISCOVERY);
if (queueStats != null ){
if (queueStats != null) {
queueStats.toXContent(builder, params);
}
if (publishStats != null) {
publishStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}
@ -64,4 +80,8 @@ public class DiscoveryStats implements Writeable, ToXContentFragment {
public PendingClusterStateStats getQueueStats() {
return queueStats;
}
public PublishClusterStateStats getPublishStats() {
return publishStats;
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.discovery.single;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -34,6 +33,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -94,7 +94,7 @@ public class SingleNodeDiscovery extends AbstractLifecycleComponent implements D
@Override
public DiscoveryStats stats() {
return new DiscoveryStats((PendingClusterStateStats) null);
return new DiscoveryStats(null, null);
}
@Override

View File

@ -65,6 +65,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class PublishClusterStateAction extends AbstractComponent {
@ -90,6 +91,10 @@ public class PublishClusterStateAction extends AbstractComponent {
private final IncomingClusterStateListener incomingClusterStateListener;
private final DiscoverySettings discoverySettings;
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
public PublishClusterStateAction(
Settings settings,
TransportService transportService,
@ -380,11 +385,13 @@ public class PublishClusterStateAction extends AbstractComponent {
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
request.bytes().length());
} else if (lastSeenClusterState != null) {
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());
incomingState = diff.apply(lastSeenClusterState);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
} else {
@ -394,6 +401,9 @@ public class PublishClusterStateAction extends AbstractComponent {
incomingClusterStateListener.onIncomingClusterState(incomingState);
lastSeenClusterState = incomingState;
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
} finally {
IOUtils.close(in);
}
@ -636,4 +646,11 @@ public class PublishClusterStateAction extends AbstractComponent {
publishingTimedOut.set(isTimedOut);
}
}
public PublishClusterStateStats stats() {
return new PublishClusterStateStats(
fullClusterStateReceivedCount.get(),
incompatibleClusterStateDiffReceivedCount.get(),
compatibleClusterStateDiffReceivedCount.get());
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
/**
* Class encapsulating stats about the PublishClusterStateAction
*/
public class PublishClusterStateStats implements Writeable, ToXContentObject {
private final long fullClusterStateReceivedCount;
private final long incompatibleClusterStateDiffReceivedCount;
private final long compatibleClusterStateDiffReceivedCount;
/**
* @param fullClusterStateReceivedCount the number of times this node has received a full copy of the cluster state from the master.
* @param incompatibleClusterStateDiffReceivedCount the number of times this node has received a cluster-state diff from the master.
* @param compatibleClusterStateDiffReceivedCount the number of times that received cluster-state diffs were compatible with
*/
public PublishClusterStateStats(long fullClusterStateReceivedCount,
long incompatibleClusterStateDiffReceivedCount,
long compatibleClusterStateDiffReceivedCount) {
this.fullClusterStateReceivedCount = fullClusterStateReceivedCount;
this.incompatibleClusterStateDiffReceivedCount = incompatibleClusterStateDiffReceivedCount;
this.compatibleClusterStateDiffReceivedCount = compatibleClusterStateDiffReceivedCount;
}
public PublishClusterStateStats(StreamInput in) throws IOException {
fullClusterStateReceivedCount = in.readVLong();
incompatibleClusterStateDiffReceivedCount = in.readVLong();
compatibleClusterStateDiffReceivedCount = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(fullClusterStateReceivedCount);
out.writeVLong(incompatibleClusterStateDiffReceivedCount);
out.writeVLong(compatibleClusterStateDiffReceivedCount);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("published_cluster_states");
{
builder.field("full_states", fullClusterStateReceivedCount);
builder.field("incompatible_diffs", incompatibleClusterStateDiffReceivedCount);
builder.field("compatible_diffs", compatibleClusterStateDiffReceivedCount);
}
builder.endObject();
return builder;
}
long getFullClusterStateReceivedCount() { return fullClusterStateReceivedCount; }
long getIncompatibleClusterStateDiffReceivedCount() { return incompatibleClusterStateDiffReceivedCount; }
long getCompatibleClusterStateDiffReceivedCount() { return compatibleClusterStateDiffReceivedCount; }
@Override
public String toString() {
return "PublishClusterStateStats(full=" + fullClusterStateReceivedCount
+ ", incompatible=" + incompatibleClusterStateDiffReceivedCount
+ ", compatible=" + compatibleClusterStateDiffReceivedCount
+ ")";
}
}

View File

@ -412,8 +412,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
@Override
public DiscoveryStats stats() {
PendingClusterStateStats queueStats = pendingStatesQueue.stats();
return new DiscoveryStats(queueStats);
return new DiscoveryStats(pendingStatesQueue.stats(), publishClusterState.stats());
}
public DiscoverySettings getDiscoverySettings() {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
import org.elasticsearch.indices.breaker.CircuitBreakerStats;
@ -411,8 +412,18 @@ public class NodeStatsTests extends ESTestCase {
}
ScriptStats scriptStats = frequently() ?
new ScriptStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) : null;
DiscoveryStats discoveryStats = frequently() ? new DiscoveryStats(randomBoolean() ? new PendingClusterStateStats(randomInt(),
randomInt(), randomInt()) : null) : null;
DiscoveryStats discoveryStats = frequently()
? new DiscoveryStats(
randomBoolean()
? new PendingClusterStateStats(randomInt(), randomInt(), randomInt())
: null,
randomBoolean()
? new PublishClusterStateStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong())
: null)
: null;
IngestStats ingestStats = null;
if (frequently()) {
IngestStats.Stats totalStats = new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),

View File

@ -705,6 +705,73 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
}
private void assertPublishClusterStateStats(String description, MockNode node, long expectedFull, long expectedIncompatibleDiffs,
long expectedCompatibleDiffs) {
PublishClusterStateStats stats = node.action.stats();
assertThat(description + ": full cluster states", stats.getFullClusterStateReceivedCount(), equalTo(expectedFull));
assertThat(description + ": incompatible cluster state diffs", stats.getIncompatibleClusterStateDiffReceivedCount(),
equalTo(expectedIncompatibleDiffs));
assertThat(description + ": compatible cluster state diffs", stats.getCompatibleClusterStateDiffReceivedCount(),
equalTo(expectedCompatibleDiffs));
}
public void testPublishClusterStateStats() throws Exception {
MockNode nodeA = createMockNode("nodeA").setAsMaster();
MockNode nodeB = createMockNode("nodeB");
assertPublishClusterStateStats("nodeA: initial state", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: initial state", nodeB, 0, 0, 0);
// Initial cluster state
ClusterState clusterState = nodeA.clusterState;
// cluster state update - add nodeB
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(clusterState.nodes()).add(nodeB.discoveryNode).build();
ClusterState previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
// Sent as a full cluster state update
assertPublishClusterStateStats("nodeA: after full update", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: after full update", nodeB, 1, 0, 0);
// Increment cluster state version
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
// Sent, successfully, as a cluster state diff
assertPublishClusterStateStats("nodeA: after successful diff update", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: after successful diff update", nodeB, 1, 0, 1);
// Increment cluster state version twice
previousClusterState = ClusterState.builder(clusterState).incrementVersion().build();
clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
// Sent, unsuccessfully, as a diff and then retried as a full update
assertPublishClusterStateStats("nodeA: after unsuccessful diff update", nodeA, 0, 0, 0);
assertPublishClusterStateStats("nodeB: after unsuccessful diff update", nodeB, 2, 1, 1);
// node A steps down from being master
nodeA.resetMasterId();
nodeB.resetMasterId();
// node B becomes the master and sends a version of the cluster state that goes back
discoveryNodes = DiscoveryNodes.builder(discoveryNodes)
.add(nodeA.discoveryNode)
.add(nodeB.discoveryNode)
.masterNodeId(nodeB.discoveryNode.getId())
.localNodeId(nodeB.discoveryNode.getId())
.build();
previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build();
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateAndWait(nodeB.action, clusterState, previousClusterState);
// Sent, unsuccessfully, as a diff, and then retried as a full update
assertPublishClusterStateStats("nodeA: B became master", nodeA, 1, 1, 0);
assertPublishClusterStateStats("nodeB: B became master", nodeB, 2, 1, 1);
}
private MetaData buildMetaDataForVersion(MetaData metaData, long version) {
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.builder(metaData.indices());

View File

@ -47,7 +47,6 @@ import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.net.UnknownHostException;
@ -255,6 +254,11 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
" \"total\" : 0,\n" +
" \"pending\" : 0,\n" +
" \"committed\" : 0\n" +
" },\n" +
" \"published_cluster_states\" : {\n" +
" \"full_states\" : 0,\n" +
" \"incompatible_diffs\" : 0,\n" +
" \"compatible_diffs\" : 0\n" +
" }\n" +
" }\n" +
"}";
@ -275,6 +279,11 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
assertThat(stats.getQueueStats().getCommitted(), equalTo(0));
assertThat(stats.getQueueStats().getPending(), equalTo(0));
assertThat(stats.getPublishStats(), notNullValue());
assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), equalTo(0L));
assertThat(stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount(), equalTo(0L));
assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), equalTo(0L));
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
stats.toXContent(builder, ToXContent.EMPTY_PARAMS);

View File

@ -1,5 +1,8 @@
---
"Discovery stats":
- skip:
version: " - 6.99.99"
reason: "published_cluster_states_received is not (yet) in 6.x"
- do:
cluster.state: {}
@ -15,6 +18,11 @@
- is_true: nodes.$master.name
- is_false: nodes.$master.jvm
- is_true: nodes.$master.discovery
- is_true: nodes.$master.discovery.cluster_state_queue
- is_true: nodes.$master.discovery.published_cluster_states
- gte: { nodes.$master.discovery.published_cluster_states.full_states: 0 }
- gte: { nodes.$master.discovery.published_cluster_states.incompatible_diffs: 0 }
- gte: { nodes.$master.discovery.published_cluster_states.compatible_diffs: 0 }
- is_true: nodes.$master.roles
- do:
@ -26,4 +34,9 @@
- is_false: nodes.$master.name
- is_false: nodes.$master.jvm
- is_true: nodes.$master.discovery
- is_true: nodes.$master.discovery.cluster_state_queue
- is_true: nodes.$master.discovery.published_cluster_states
- gte: { nodes.$master.discovery.published_cluster_states.full_states: 0 }
- gte: { nodes.$master.discovery.published_cluster_states.incompatible_diffs: 0 }
- gte: { nodes.$master.discovery.published_cluster_states.compatible_diffs: 0 }
- is_false: nodes.$master.roles