more feedback

This commit is contained in:
Boaz Leskes 2015-08-27 17:15:18 +02:00
parent 0668e0d623
commit 10e8c410ea
1 changed files with 18 additions and 11 deletions

View File

@ -25,7 +25,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.Compressor;
@ -220,9 +219,7 @@ public class PublishClusterStateAction extends AbstractComponent {
if (bytes == null) {
try {
bytes = serializeFullClusterState(clusterState, node.version());
if (serializedStates != null) {
serializedStates.put(node.version(), bytes);
}
serializedStates.put(node.version(), bytes);
} catch (Throwable e) {
logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node);
sendingController.onNodeSendFailed(node, e);
@ -478,6 +475,12 @@ public class PublishClusterStateAction extends AbstractComponent {
}
/**
* Coordinates acknowledgments of the sent cluster state from the different nodes. Commits the change
* after `minimum_master_nodes` have successfully responded or fails the entire change. After committing
* the cluster state, will trigger a commit message to all nodes that responded previously and responds immediately
* to all future acknowledgments.
*/
class SendingController {
private final ClusterState clusterState;
@ -543,14 +546,18 @@ public class PublishClusterStateAction extends AbstractComponent {
// we're still waiting
sendAckedBeforeCommit.add(node);
if (node.isMasterNode()) {
onMasterNodeSendAck(node);
checkForCommitOrFailIfNoPending(node);
}
}
}
synchronized private void onMasterNodeSendAck(DiscoveryNode node) {
/**
* check if enough master node responded to commit the change. fails the commit
* if there are no more pending master nodes but not enough acks to commit.
*/
synchronized private void checkForCommitOrFailIfNoPending(DiscoveryNode masterNode) {
logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])",
node, clusterState.version(), pendingMasterNodes, neededMastersToCommit);
masterNode, clusterState.version(), pendingMasterNodes, neededMastersToCommit);
neededMastersToCommit--;
if (neededMastersToCommit == 0) {
if (markAsCommitted()) {
@ -560,13 +567,13 @@ public class PublishClusterStateAction extends AbstractComponent {
sendAckedBeforeCommit.clear();
}
}
onMasterNodeDone(node);
decrementPendingMasterAcksAndChangeForFailure();
}
synchronized private void onMasterNodeDone(DiscoveryNode node) {
synchronized private void decrementPendingMasterAcksAndChangeForFailure() {
pendingMasterNodes--;
if (pendingMasterNodes == 0 && neededMastersToCommit > 0) {
markAsFailed("no more pending master nodes, but [" + neededMastersToCommit + "] acks are still needed");
markAsFailed("no more pending master nodes, but failed to reach needed acks ([" + neededMastersToCommit + "] left)");
}
}
@ -574,7 +581,7 @@ public class PublishClusterStateAction extends AbstractComponent {
if (node.isMasterNode()) {
logger.trace("master node {} failed to ack cluster state version [{}]. processing ... (current pending [{}], needed [{}])",
node, clusterState.version(), pendingMasterNodes, neededMastersToCommit);
onMasterNodeDone(node);
decrementPendingMasterAcksAndChangeForFailure();
}
publishResponseHandler.onFailure(node, t);
}