[Zen2] Fix test failures in diff-based publishing (#35684)

`testIncompatibleDiffResendsFullState` sometimes makes a 2-node cluster and
then partitions one of the nodes from the leader, which makes the leader stand
down.  Then when the partition is removed the cluster re-forms but does so by
sending full cluster states, not diffs, causing the test to fail.

Additionally `testDiffBasedPublishing` sometimes fails if a publication is
delivered out-of-order, wiping out a fresher last-received cluster state with a
less-fresh one. This is fixed here by passing the received cluster state to the
coordinator before recording it as the last-received one, relying on the
coordinator's freshness checks.
This commit is contained in:
David Turner 2018-11-22 09:08:52 +00:00 committed by GitHub
parent c816347253
commit cfdf666672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 12 deletions

View File

@ -201,8 +201,10 @@ public class PublicationTransportHandler {
}
});
} else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
logger.trace("sending full cluster state version {} to {}", newState.version(), destination);
PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener);
} else {
logger.trace("sending cluster state diff for version {} to {}", newState.version(), destination);
PublicationTransportHandler.this.sendClusterStateDiff(newState, serializedDiffs, serializedStates, destination,
responseActionListener);
}
@ -381,7 +383,6 @@ public class PublicationTransportHandler {
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
final Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
final ClusterState incomingState;
try {
if (compressor != null) {
in = compressor.streamInput(in);
@ -390,11 +391,13 @@ public class PublicationTransportHandler {
in.setVersion(request.version());
// If true we received full cluster state - otherwise diffs
if (in.readBoolean()) {
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
final ClusterState incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
request.bytes().length());
final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState));
lastSeenClusterState.set(incomingState);
return response;
} else {
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
@ -402,11 +405,13 @@ public class PublicationTransportHandler {
throw new IncompatibleClusterStateVersionException("have no local cluster state");
} else {
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
final ClusterState incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
compatibleClusterStateDiffReceivedCount.incrementAndGet();
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState));
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
return response;
}
}
} catch (IncompatibleClusterStateVersionException e) {
@ -418,7 +423,5 @@ public class PublicationTransportHandler {
} finally {
IOUtils.close(in);
}
return handlePublishRequest.apply(new PublishRequest(incomingState));
}
}

View File

@ -784,19 +784,19 @@ public class CoordinatorTests extends ESTestCase {
assertThat(value(cn.getLastAppliedClusterState()), is(finalValue));
if (cn == leader) {
// leader does not update publish stats as it's not using the serialized state
assertEquals(prePublishStats.get(cn).getFullClusterStateReceivedCount(),
assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
postPublishStats.get(cn).getFullClusterStateReceivedCount());
assertEquals(prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount(),
assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount(),
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
assertEquals(prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
} else {
// followers receive a diff
assertEquals(prePublishStats.get(cn).getFullClusterStateReceivedCount(),
assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
postPublishStats.get(cn).getFullClusterStateReceivedCount());
assertEquals(prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
assertEquals(prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
}
}
@ -818,17 +818,21 @@ public class CoordinatorTests extends ESTestCase {
}
public void testIncompatibleDiffResendsFullState() {
final Cluster cluster = new Cluster(randomIntBetween(2, 5));
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();
final ClusterNode leader = cluster.getAnyLeader();
final ClusterNode follower = cluster.getAnyNodeExcept(leader);
logger.info("--> blackholing {}", follower);
follower.blackhole();
final PublishClusterStateStats prePublishStats = follower.coordinator.stats().getPublishStats();
logger.info("--> submitting first value to {}", leader);
leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY + defaultMillis(PUBLISH_TIMEOUT_SETTING), "publish first state");
logger.info("--> healing {}", follower);
follower.heal();
logger.info("--> submitting second value to {}", leader);
leader.submitValue(randomLong());
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
final PublishClusterStateStats postPublishStats = follower.coordinator.stats().getPublishStats();