From 10e8c410ea853654868df52dc06883826562b839 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 27 Aug 2015 17:15:18 +0200 Subject: [PATCH] more feedback --- .../publish/PublishClusterStateAction.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index adddfb1a376..a9b9010d910 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.compress.Compressor; @@ -220,9 +219,7 @@ public class PublishClusterStateAction extends AbstractComponent { if (bytes == null) { try { bytes = serializeFullClusterState(clusterState, node.version()); - if (serializedStates != null) { - serializedStates.put(node.version(), bytes); - } + serializedStates.put(node.version(), bytes); } catch (Throwable e) { logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node); sendingController.onNodeSendFailed(node, e); @@ -478,6 +475,12 @@ public class PublishClusterStateAction extends AbstractComponent { } + /** + * Coordinates acknowledgments of the sent cluster state from the different nodes. Commits the change + * after `minimum_master_nodes` have successfully responded or fails the entire change. After committing + * the cluster state, will trigger a commit message to all nodes that responded previously and responds immediately + * to all future acknowledgments. + */ class SendingController { private final ClusterState clusterState; @@ -543,14 +546,18 @@ public class PublishClusterStateAction extends AbstractComponent { // we're still waiting sendAckedBeforeCommit.add(node); if (node.isMasterNode()) { - onMasterNodeSendAck(node); + checkForCommitOrFailIfNoPending(node); } } } - synchronized private void onMasterNodeSendAck(DiscoveryNode node) { + /** + * check if enough master node responded to commit the change. fails the commit + * if there are no more pending master nodes but not enough acks to commit. + */ + synchronized private void checkForCommitOrFailIfNoPending(DiscoveryNode masterNode) { logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])", - node, clusterState.version(), pendingMasterNodes, neededMastersToCommit); + masterNode, clusterState.version(), pendingMasterNodes, neededMastersToCommit); neededMastersToCommit--; if (neededMastersToCommit == 0) { if (markAsCommitted()) { @@ -560,13 +567,13 @@ public class PublishClusterStateAction extends AbstractComponent { sendAckedBeforeCommit.clear(); } } - onMasterNodeDone(node); + decrementPendingMasterAcksAndChangeForFailure(); } - synchronized private void onMasterNodeDone(DiscoveryNode node) { + synchronized private void decrementPendingMasterAcksAndChangeForFailure() { pendingMasterNodes--; if (pendingMasterNodes == 0 && neededMastersToCommit > 0) { - markAsFailed("no more pending master nodes, but [" + neededMastersToCommit + "] acks are still needed"); + markAsFailed("no more pending master nodes, but failed to reach needed acks ([" + neededMastersToCommit + "] left)"); } } @@ -574,7 +581,7 @@ public class PublishClusterStateAction extends AbstractComponent { if (node.isMasterNode()) { logger.trace("master node {} failed to ack cluster state version [{}]. processing ... (current pending [{}], needed [{}])", node, clusterState.version(), pendingMasterNodes, neededMastersToCommit); - onMasterNodeDone(node); + decrementPendingMasterAcksAndChangeForFailure(); } publishResponseHandler.onFailure(node, t); }