Made sure the new cluster state is not held by AckCountDownListener

(also made AckCountDownListener class static)
This commit is contained in:
Luca Cavanna 2013-12-10 11:15:46 +01:00
parent acf371ef57
commit ac158f6a87
1 changed files with 19 additions and 12 deletions

View File

@ -35,6 +35,8 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.unit.TimeValue;
@ -339,7 +341,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
ackedUpdateTask.onAckTimeout();
} else {
try {
ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState, threadPool);
ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool);
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
@ -625,25 +627,30 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
private class AckCountDownListener implements Discovery.AckListener {
private static class AckCountDownListener implements Discovery.AckListener {
private static final ESLogger logger = Loggers.getLogger(AckCountDownListener.class);
private final AckedClusterStateUpdateTask ackedUpdateTask;
private final CountDown countDown;
private final ClusterState clusterState;
private final DiscoveryNodes nodes;
private final long clusterStateVersion;
private final Future<?> ackTimeoutCallback;
private Throwable lastFailure;
AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, ClusterState clusterState, ThreadPool threadPool) {
AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) {
this.ackedUpdateTask = ackedUpdateTask;
this.clusterState = clusterState;
this.clusterStateVersion = clusterStateVersion;
this.nodes = nodes;
int countDown = 0;
for (DiscoveryNode node : clusterState.nodes()) {
for (DiscoveryNode node : nodes) {
if (ackedUpdateTask.mustAck(node)) {
countDown++;
}
}
//we always wait for at least 1 node (the master)
countDown = Math.max(1, countDown);
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterState.version());
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion);
this.countDown = new CountDown(countDown);
this.ackTimeoutCallback = threadPool.schedule(ackedUpdateTask.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() {
@Override
@ -657,19 +664,19 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) {
if (!ackedUpdateTask.mustAck(node)) {
//we always wait for the master ack anyway
if (!node.equals(clusterState.nodes().masterNode())) {
if (!node.equals(nodes.masterNode())) {
return;
}
}
if (t == null) {
logger.trace("ack received from node [{}], cluster_state update (version: {})", node, clusterState.version());
logger.trace("ack received from node [{}], cluster_state update (version: {})", node, clusterStateVersion);
} else {
this.lastFailure = t;
logger.debug("ack received from node [{}], cluster_state update (version: {})", t, node, clusterState.version());
logger.debug("ack received from node [{}], cluster_state update (version: {})", t, node, clusterStateVersion);
}
if (countDown.countDown()) {
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterState.version());
logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion);
ackTimeoutCallback.cancel(true);
ackedUpdateTask.onAllNodesAcked(lastFailure);
}
@ -678,7 +685,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
@Override
public void onTimeout() {
if (countDown.fastForward()) {
logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterState.version());
logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", clusterStateVersion);
ackedUpdateTask.onAckTimeout();
}
}