remove committedOrFailed and use committedOrFailedLatch for state

This commit is contained in:
Boaz Leskes 2015-08-28 12:31:37 +02:00
parent 10e8c410ea
commit 218979da1b
1 changed files with 8 additions and 8 deletions

View File

@ -491,10 +491,9 @@ public class PublishClusterStateAction extends AbstractComponent {
private final BlockingClusterStatePublishResponseHandler publishResponseHandler; private final BlockingClusterStatePublishResponseHandler publishResponseHandler;
final ArrayList<DiscoveryNode> sendAckedBeforeCommit = new ArrayList<>(); final ArrayList<DiscoveryNode> sendAckedBeforeCommit = new ArrayList<>();
final CountDownLatch committedOrFailedLatch;
// writes and reads of these are protected under synchronization // writes and reads of these are protected under synchronization
boolean committedOrFailed; // true if a decision was made w.r.t committing or failing final CountDownLatch committedOrFailedLatch; // 0 count indicates that a decision was made w.r.t committing or failing
boolean committed; // true if cluster state was committed boolean committed; // true if cluster state was committed
int neededMastersToCommit; // number of master nodes acks still needed before committing int neededMastersToCommit; // number of master nodes acks still needed before committing
int pendingMasterNodes; // how many master node still need to respond int pendingMasterNodes; // how many master node still need to respond
@ -511,7 +510,6 @@ public class PublishClusterStateAction extends AbstractComponent {
throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes); throw new Discovery.FailedToCommitClusterStateException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes);
} }
this.committed = neededMastersToCommit == 0; this.committed = neededMastersToCommit == 0;
this.committedOrFailed = committed;
this.committedOrFailedLatch = new CountDownLatch(committed ? 0 : 1); this.committedOrFailedLatch = new CountDownLatch(committed ? 0 : 1);
} }
@ -540,7 +538,7 @@ public class PublishClusterStateAction extends AbstractComponent {
if (committed) { if (committed) {
assert sendAckedBeforeCommit.isEmpty(); assert sendAckedBeforeCommit.isEmpty();
sendCommitToNode(node, clusterState, this); sendCommitToNode(node, clusterState, this);
} else if (committedOrFailed) { } else if (committedOrFailed()) {
logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version()); logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version());
} else { } else {
// we're still waiting // we're still waiting
@ -551,6 +549,10 @@ public class PublishClusterStateAction extends AbstractComponent {
} }
} }
private synchronized boolean committedOrFailed() {
return committedOrFailedLatch.getCount() == 0;
}
/** /**
* check if enough master node responded to commit the change. fails the commit * check if enough master node responded to commit the change. fails the commit
* if there are no more pending master nodes but not enough acks to commit. * if there are no more pending master nodes but not enough acks to commit.
@ -592,12 +594,11 @@ public class PublishClusterStateAction extends AbstractComponent {
* @return true if successful * @return true if successful
*/ */
synchronized private boolean markAsCommitted() { synchronized private boolean markAsCommitted() {
if (committedOrFailed) { if (committedOrFailed()) {
return committed; return committed;
} }
logger.trace("committing version [{}]", clusterState.version()); logger.trace("committing version [{}]", clusterState.version());
committed = true; committed = true;
committedOrFailed = true;
committedOrFailedLatch.countDown(); committedOrFailedLatch.countDown();
return true; return true;
} }
@ -608,11 +609,10 @@ public class PublishClusterStateAction extends AbstractComponent {
* @return true if the publishing was failed and the cluster state is *not* committed * @return true if the publishing was failed and the cluster state is *not* committed
**/ **/
synchronized private boolean markAsFailed(String reason) { synchronized private boolean markAsFailed(String reason) {
if (committedOrFailed) { if (committedOrFailed()) {
return committed == false; return committed == false;
} }
logger.trace("failed to commit version [{}]. {}", clusterState.version(), reason); logger.trace("failed to commit version [{}]. {}", clusterState.version(), reason);
committedOrFailed = true;
committed = false; committed = false;
committedOrFailedLatch.countDown(); committedOrFailedLatch.countDown();
return true; return true;