diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 7722511b6ab..873e45d4a17 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -459,13 +459,16 @@ public class PublishClusterStateAction extends AbstractComponent { } private final BlockingClusterStatePublishResponseHandler publishResponseHandler; - volatile int neededMastersToCommit; - int pendingMasterNodes; final ArrayList sendAckedBeforeCommit = new ArrayList<>(); - final CountDownLatch comittedOrFailed; - final AtomicBoolean committed; + final CountDownLatch committedOrFailedLatch; - // an external marker to note that the publishing process is timed out. This is usefull for proper logging. + // writes and reads of these are protected under synchronization + boolean committedOrFailed; // true if a decision was made w.r.t committing or failing + boolean committed; // true if cluster state was committed + int neededMastersToCommit; // number of master nodes acks still needed before committing + int pendingMasterNodes; // how many master node still need to respond + + // an external marker to note that the publishing process is timed out. This is useful for proper logging. final AtomicBoolean publishingTimedOut = new AtomicBoolean(); private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes, BlockingClusterStatePublishResponseHandler publishResponseHandler) { @@ -476,60 +479,66 @@ public class PublishClusterStateAction extends AbstractComponent { if (this.neededMastersToCommit > this.pendingMasterNodes) { throw new FailedToCommitException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes); } - this.committed = new AtomicBoolean(neededMastersToCommit == 0); - this.comittedOrFailed = new CountDownLatch(committed.get() ? 0 : 1); + this.committed = neededMastersToCommit == 0; + this.committedOrFailed = committed; + this.committedOrFailedLatch = new CountDownLatch(committed ? 0 : 1); } public void waitForCommit(TimeValue commitTimeout) { boolean timedout = false; try { - timedout = comittedOrFailed.await(commitTimeout.millis(), TimeUnit.MILLISECONDS) == false; + timedout = committedOrFailedLatch.await(commitTimeout.millis(), TimeUnit.MILLISECONDS) == false; } catch (InterruptedException e) { } - //nocommit: make sure we prevent publishing successfully! - if (committed.get() == false) { + + if (timedout) { + markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]"); + } + if (isCommitted() == false) { throw new FailedToCommitException("{} enough masters to ack sent cluster state. [{}] left", timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit); } } + synchronized public boolean isCommitted() { + return committed; + } + synchronized public void onNodeSendAck(DiscoveryNode node) { - if (committed.get() == false) { + if (committed) { + assert sendAckedBeforeCommit.isEmpty(); + sendCommitToNode(node, clusterState, this); + } else if (committedOrFailed) { + logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version()); + } else { + // we're still waiting sendAckedBeforeCommit.add(node); if (node.isMasterNode()) { onMasterNodeSendAck(node); } - } else { - assert sendAckedBeforeCommit.isEmpty(); - sendCommitToNode(node, clusterState, this); } - } - private void onMasterNodeSendAck(DiscoveryNode node) { + synchronized private void onMasterNodeSendAck(DiscoveryNode node) { logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])", node, clusterState.version(), pendingMasterNodes, neededMastersToCommit); neededMastersToCommit--; if (neededMastersToCommit == 0) { - logger.trace("committing version [{}]", clusterState.version()); - for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) { - sendCommitToNode(nodeToCommit, clusterState, this); + if (markAsCommitted()) { + for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) { + sendCommitToNode(nodeToCommit, clusterState, this); + } + sendAckedBeforeCommit.clear(); } - sendAckedBeforeCommit.clear(); - boolean success = committed.compareAndSet(false, true); - assert success; - comittedOrFailed.countDown(); } onMasterNodeDone(node); } - private void onMasterNodeDone(DiscoveryNode node) { + synchronized private void onMasterNodeDone(DiscoveryNode node) { pendingMasterNodes--; if (pendingMasterNodes == 0 && neededMastersToCommit > 0) { - logger.trace("failed to commit version [{}]. All master nodes acked or failed but [{}] acks are still needed", - clusterState.version(), neededMastersToCommit); - comittedOrFailed.countDown(); + markAsFailed("All master nodes acked or failed but [" + neededMastersToCommit + "] acks are still needed"); } } @@ -542,6 +551,38 @@ public class PublishClusterStateAction extends AbstractComponent { publishResponseHandler.onFailure(node, t); } + /** + * tries and commit the current state, if a decision wasn't made yet + * + * @return true if successful + */ + synchronized private boolean markAsCommitted() { + if (committedOrFailed) { + return committed; + } + logger.trace("committing version [{}]", clusterState.version()); + committed = true; + committedOrFailed = true; + committedOrFailedLatch.countDown(); + return true; + } + + /** + * tries marking the publishing as failed, if a decision wasn't made yet + * + * @return true if the publishing was failed and the cluster state is *not* committed + **/ + synchronized private boolean markAsFailed(String reason) { + if (committedOrFailed) { + return committed == false; + } + logger.trace("failed to commit version [{}]. {}", clusterState.version(), reason); + committedOrFailed = true; + committed = false; + committedOrFailedLatch.countDown(); + return true; + } + public boolean getPublishingTimedOut() { return publishingTimedOut.get(); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index 80224f055a1..994805ab3f4 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -360,7 +360,7 @@ public class PublishClusterStateActionTests extends ESTestCase { @Test public void testSimultaneousClusterStatePublishing() throws Exception { int numberOfNodes = randomIntBetween(2, 10); - int numberOfIterations = randomIntBetween(10, 50); + int numberOfIterations = scaledRandomIntBetween(5, 50); Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE, randomBoolean()).build(); DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(); MockNode master = null; @@ -490,7 +490,7 @@ public class PublishClusterStateActionTests extends ESTestCase { Settings.Builder settings = Settings.builder(); // make sure we have a reasonable timeout if we expect to timeout, o.w. one that will make the test "hang" settings.put(DiscoverySettings.COMMIT_TIMEOUT, expectingToCommit == false && timeOutNodes > 0 ? "100ms" : "1h") - .put(DiscoverySettings.PUBLISH_TIMEOUT, "5ms"); // test is about comitting + .put(DiscoverySettings.PUBLISH_TIMEOUT, "5ms"); // test is about committing MockNode master = createMockNode("master", settings.build()); @@ -655,6 +655,39 @@ public class PublishClusterStateActionTests extends ESTestCase { assertSameState(node.clusterState, state2); } + /** + * Tests that cluster is committed or times out. It should never be the case that we fail + * an update due to a commit timeout, but it ends up being committed anyway + */ + public void testTimeoutOrCommit() throws Exception { + Settings settings = Settings.builder() + .put(DiscoverySettings.COMMIT_TIMEOUT, "1ms").build(); // short but so we will sometime commit sometime timeout + + MockNode master = createMockNode("master", settings); + MockNode node = createMockNode("node", settings); + ClusterState state = ClusterState.builder(master.clusterState) + .nodes(DiscoveryNodes.builder(master.clusterState.nodes()).put(node.discoveryNode).masterNodeId(master.discoveryNode.id())).build(); + + for (int i = 0; i < 10; i++) { + state = ClusterState.builder(state).incrementVersion().build(); + logger.debug("--> publishing version [{}], UUID [{}]", state.version(), state.stateUUID()); + boolean success; + try { + publishState(master.action, state, master.clusterState, 2).await(1, TimeUnit.HOURS); + success = true; + } catch (PublishClusterStateAction.FailedToCommitException OK) { + success = false; + } + logger.debug("--> publishing [{}], verifying...", success ? "succeeded" : "failed"); + + if (success) { + assertSameState(node.clusterState, state); + } else { + assertThat(node.clusterState.stateUUID(), not(equalTo(state.stateUUID()))); + } + } + } + private MetaData buildMetaDataForVersion(MetaData metaData, long version) { ImmutableOpenMap.Builder indices = ImmutableOpenMap.builder(metaData.indices()); @@ -693,7 +726,7 @@ public class PublishClusterStateActionTests extends ESTestCase { public static class AssertingAckListener implements Discovery.AckListener { private final List> errors = new CopyOnWriteArrayList<>(); - private final AtomicBoolean timeoutOccured = new AtomicBoolean(); + private final AtomicBoolean timeoutOccurred = new AtomicBoolean(); private final CountDownLatch countDown; public AssertingAckListener(int nodeCount) { @@ -710,7 +743,7 @@ public class PublishClusterStateActionTests extends ESTestCase { @Override public void onTimeout() { - timeoutOccured.set(true); + timeoutOccurred.set(true); // Fast forward the counter - no reason to wait here long currentCount = countDown.getCount(); for (long i = 0; i < currentCount; i++) { @@ -724,7 +757,7 @@ public class PublishClusterStateActionTests extends ESTestCase { public List> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException { countDown.await(timeout, unit); - assertFalse(timeoutOccured.get()); + assertFalse(timeoutOccurred.get()); return errors; }