Improved concurrency controls In SendingController to make sure that a CS is never committed after publishing is marked out as timed out

This commit is contained in:
Boaz Leskes 2015-08-24 17:13:15 +02:00
parent 234a3794e5
commit e3e0aa5049
2 changed files with 106 additions and 32 deletions

View File

@ -459,13 +459,16 @@ public class PublishClusterStateAction extends AbstractComponent {
}
private final BlockingClusterStatePublishResponseHandler publishResponseHandler;
volatile int neededMastersToCommit;
int pendingMasterNodes;
final ArrayList<DiscoveryNode> sendAckedBeforeCommit = new ArrayList<>();
final CountDownLatch comittedOrFailed;
final AtomicBoolean committed;
final CountDownLatch committedOrFailedLatch;
// an external marker to note that the publishing process is timed out. This is usefull for proper logging.
// writes and reads of these are protected under synchronization
boolean committedOrFailed; // true if a decision was made w.r.t committing or failing
boolean committed; // true if cluster state was committed
int neededMastersToCommit; // number of master nodes acks still needed before committing
int pendingMasterNodes; // how many master node still need to respond
// an external marker to note that the publishing process is timed out. This is useful for proper logging.
final AtomicBoolean publishingTimedOut = new AtomicBoolean();
private SendingController(ClusterState clusterState, int minMasterNodes, int totalMasterNodes, BlockingClusterStatePublishResponseHandler publishResponseHandler) {
@ -476,60 +479,66 @@ public class PublishClusterStateAction extends AbstractComponent {
if (this.neededMastersToCommit > this.pendingMasterNodes) {
throw new FailedToCommitException("not enough masters to ack sent cluster state. [{}] needed , have [{}]", neededMastersToCommit, pendingMasterNodes);
}
this.committed = new AtomicBoolean(neededMastersToCommit == 0);
this.comittedOrFailed = new CountDownLatch(committed.get() ? 0 : 1);
this.committed = neededMastersToCommit == 0;
this.committedOrFailed = committed;
this.committedOrFailedLatch = new CountDownLatch(committed ? 0 : 1);
}
public void waitForCommit(TimeValue commitTimeout) {
boolean timedout = false;
try {
timedout = comittedOrFailed.await(commitTimeout.millis(), TimeUnit.MILLISECONDS) == false;
timedout = committedOrFailedLatch.await(commitTimeout.millis(), TimeUnit.MILLISECONDS) == false;
} catch (InterruptedException e) {
}
//nocommit: make sure we prevent publishing successfully!
if (committed.get() == false) {
if (timedout) {
markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]");
}
if (isCommitted() == false) {
throw new FailedToCommitException("{} enough masters to ack sent cluster state. [{}] left",
timedout ? "timed out while waiting for" : "failed to get", neededMastersToCommit);
}
}
synchronized public boolean isCommitted() {
return committed;
}
synchronized public void onNodeSendAck(DiscoveryNode node) {
if (committed.get() == false) {
if (committed) {
assert sendAckedBeforeCommit.isEmpty();
sendCommitToNode(node, clusterState, this);
} else if (committedOrFailed) {
logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version());
} else {
// we're still waiting
sendAckedBeforeCommit.add(node);
if (node.isMasterNode()) {
onMasterNodeSendAck(node);
}
} else {
assert sendAckedBeforeCommit.isEmpty();
sendCommitToNode(node, clusterState, this);
}
}
private void onMasterNodeSendAck(DiscoveryNode node) {
synchronized private void onMasterNodeSendAck(DiscoveryNode node) {
logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])",
node, clusterState.version(), pendingMasterNodes, neededMastersToCommit);
neededMastersToCommit--;
if (neededMastersToCommit == 0) {
logger.trace("committing version [{}]", clusterState.version());
for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) {
sendCommitToNode(nodeToCommit, clusterState, this);
if (markAsCommitted()) {
for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) {
sendCommitToNode(nodeToCommit, clusterState, this);
}
sendAckedBeforeCommit.clear();
}
sendAckedBeforeCommit.clear();
boolean success = committed.compareAndSet(false, true);
assert success;
comittedOrFailed.countDown();
}
onMasterNodeDone(node);
}
private void onMasterNodeDone(DiscoveryNode node) {
synchronized private void onMasterNodeDone(DiscoveryNode node) {
pendingMasterNodes--;
if (pendingMasterNodes == 0 && neededMastersToCommit > 0) {
logger.trace("failed to commit version [{}]. All master nodes acked or failed but [{}] acks are still needed",
clusterState.version(), neededMastersToCommit);
comittedOrFailed.countDown();
markAsFailed("All master nodes acked or failed but [" + neededMastersToCommit + "] acks are still needed");
}
}
@ -542,6 +551,38 @@ public class PublishClusterStateAction extends AbstractComponent {
publishResponseHandler.onFailure(node, t);
}
/**
* tries and commit the current state, if a decision wasn't made yet
*
* @return true if successful
*/
synchronized private boolean markAsCommitted() {
if (committedOrFailed) {
return committed;
}
logger.trace("committing version [{}]", clusterState.version());
committed = true;
committedOrFailed = true;
committedOrFailedLatch.countDown();
return true;
}
/**
* tries marking the publishing as failed, if a decision wasn't made yet
*
* @return true if the publishing was failed and the cluster state is *not* committed
**/
synchronized private boolean markAsFailed(String reason) {
if (committedOrFailed) {
return committed == false;
}
logger.trace("failed to commit version [{}]. {}", clusterState.version(), reason);
committedOrFailed = true;
committed = false;
committedOrFailedLatch.countDown();
return true;
}
public boolean getPublishingTimedOut() {
return publishingTimedOut.get();
}

View File

@ -360,7 +360,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
@Test
public void testSimultaneousClusterStatePublishing() throws Exception {
int numberOfNodes = randomIntBetween(2, 10);
int numberOfIterations = randomIntBetween(10, 50);
int numberOfIterations = scaledRandomIntBetween(5, 50);
Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE, randomBoolean()).build();
DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder();
MockNode master = null;
@ -490,7 +490,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
Settings.Builder settings = Settings.builder();
// make sure we have a reasonable timeout if we expect to timeout, o.w. one that will make the test "hang"
settings.put(DiscoverySettings.COMMIT_TIMEOUT, expectingToCommit == false && timeOutNodes > 0 ? "100ms" : "1h")
.put(DiscoverySettings.PUBLISH_TIMEOUT, "5ms"); // test is about comitting
.put(DiscoverySettings.PUBLISH_TIMEOUT, "5ms"); // test is about committing
MockNode master = createMockNode("master", settings.build());
@ -655,6 +655,39 @@ public class PublishClusterStateActionTests extends ESTestCase {
assertSameState(node.clusterState, state2);
}
/**
* Tests that cluster is committed or times out. It should never be the case that we fail
* an update due to a commit timeout, but it ends up being committed anyway
*/
public void testTimeoutOrCommit() throws Exception {
Settings settings = Settings.builder()
.put(DiscoverySettings.COMMIT_TIMEOUT, "1ms").build(); // short but so we will sometime commit sometime timeout
MockNode master = createMockNode("master", settings);
MockNode node = createMockNode("node", settings);
ClusterState state = ClusterState.builder(master.clusterState)
.nodes(DiscoveryNodes.builder(master.clusterState.nodes()).put(node.discoveryNode).masterNodeId(master.discoveryNode.id())).build();
for (int i = 0; i < 10; i++) {
state = ClusterState.builder(state).incrementVersion().build();
logger.debug("--> publishing version [{}], UUID [{}]", state.version(), state.stateUUID());
boolean success;
try {
publishState(master.action, state, master.clusterState, 2).await(1, TimeUnit.HOURS);
success = true;
} catch (PublishClusterStateAction.FailedToCommitException OK) {
success = false;
}
logger.debug("--> publishing [{}], verifying...", success ? "succeeded" : "failed");
if (success) {
assertSameState(node.clusterState, state);
} else {
assertThat(node.clusterState.stateUUID(), not(equalTo(state.stateUUID())));
}
}
}
private MetaData buildMetaDataForVersion(MetaData metaData, long version) {
ImmutableOpenMap.Builder<String, IndexMetaData> indices = ImmutableOpenMap.builder(metaData.indices());
@ -693,7 +726,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
public static class AssertingAckListener implements Discovery.AckListener {
private final List<Tuple<DiscoveryNode, Throwable>> errors = new CopyOnWriteArrayList<>();
private final AtomicBoolean timeoutOccured = new AtomicBoolean();
private final AtomicBoolean timeoutOccurred = new AtomicBoolean();
private final CountDownLatch countDown;
public AssertingAckListener(int nodeCount) {
@ -710,7 +743,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
@Override
public void onTimeout() {
timeoutOccured.set(true);
timeoutOccurred.set(true);
// Fast forward the counter - no reason to wait here
long currentCount = countDown.getCount();
for (long i = 0; i < currentCount; i++) {
@ -724,7 +757,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
public List<Tuple<DiscoveryNode, Throwable>> awaitErrors(long timeout, TimeUnit unit) throws InterruptedException {
countDown.await(timeout, unit);
assertFalse(timeoutOccured.get());
assertFalse(timeoutOccurred.get());
return errors;
}