Discovery to support a timeout waiting for other nodes to processing new cluster state

The master node processing changes to cluster state, and part of the processing is publishing the cluster state to other nodes. It does not wait for the cluster state to be processed on the other nodes before it moves on to the next cluster state processing job.

This is fine, we support out of order cluster state events using versioning, and nodes can handle those cases. It does lead though to non optimal API semantics. For example, when issuing cluster health, and waiting for green state, the master node will report back once the cluster is green based on its cluster state, but that mentioned "green" state might not have been received by all other nodes yet.

Add a discovery.zen.publish_timeout setting, and default it to 5s. This will give a best effort into making sure all nodes will process a cluster state within a window of time.

closes #3736
This commit is contained in:
Shay Banon 2013-09-19 13:09:14 +02:00
parent cf0c360f86
commit 1581f25e27
4 changed files with 125 additions and 24 deletions

View File

@ -380,13 +380,13 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
// we publish here before we send a notification to all the listeners, since if it fails
// we don't want to notify
if (newClusterState.nodes().localNodeMaster()) {
logger.debug("Publishing cluster state version {}", newClusterState.version());
logger.debug("publishing cluster state version {}", newClusterState.version());
discoveryService.publish(newClusterState);
}
// update the current cluster state
clusterState = newClusterState;
logger.debug("Set cluster state to version {}. Broadcasting to listeners.", newClusterState.version());
logger.debug("set local cluster state to version {}", newClusterState.version());
for (ClusterStateListener listener : priorityClusterStateListeners) {
listener.clusterChanged(clusterChangedEvent);

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
@ -43,6 +44,8 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -62,6 +65,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private final ClusterName clusterName;
private final Version version;
private final TimeValue publishTimeout;
private DiscoveryNode localNode;
private volatile boolean master = false;
@ -83,6 +88,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.version = version;
this.publishTimeout = settings.getAsTime("discovery.zen.publish_timeout", TimeValue.timeValueSeconds(5));
}
@Override
@ -283,8 +290,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
try {
// we do the marshaling intentionally, to check it works well...
final byte[] clusterStateBytes = Builder.toBytes(clusterState);
for (LocalDiscovery discovery : clusterGroup.members()) {
LocalDiscovery[] members = clusterGroup.members().toArray(new LocalDiscovery[0]);
final CountDownLatch latch = new CountDownLatch(members.length);
for (LocalDiscovery discovery : members) {
if (discovery.master) {
latch.countDown();
continue;
}
final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode);
@ -308,15 +318,33 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
latch.countDown();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
latch.countDown();
}
});
} else {
latch.countDown();
}
}
if (publishTimeout.millis() > 0) {
try {
boolean awaited = latch.await(publishTimeout.millis(), TimeUnit.MILLISECONDS);
if (!awaited) {
logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout);
}
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
// failure to marshal or un-marshal
throw new ElasticSearchIllegalStateException("Cluster state failed to serialize", e);

View File

@ -533,9 +533,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
});
}
void handleNewClusterStateFromMaster(final ClusterState newState) {
void handleNewClusterStateFromMaster(final ClusterState newState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
if (master) {
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (newState.version() > currentState.version()) {
@ -553,15 +553,22 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
newStateProcessed.onNewClusterStateProcessed();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
newStateProcessed.onNewClusterStateFailed(t);
}
});
} else {
if (newState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newState.nodes().masterNode());
newStateProcessed.onNewClusterStateFailed(new ElasticSearchIllegalStateException("received state from a node that is not part of the cluster"));
} else {
if (currentJoinThread != null) {
logger.debug("got a new state from master node, though we are already trying to rejoin the cluster");
@ -612,11 +619,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
newStateProcessed.onNewClusterStateFailed(t);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
newStateProcessed.onNewClusterStateProcessed();
}
});
}
@ -775,9 +784,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
private class NewClusterStateListener implements PublishClusterStateAction.NewClusterStateListener {
@Override
public void onNewClusterState(ClusterState clusterState) {
handleNewClusterStateFromMaster(clusterState);
public void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed) {
handleNewClusterStateFromMaster(clusterState, newStateProcessed);
}
}

View File

@ -29,12 +29,15 @@ import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
*
@ -42,13 +45,23 @@ import java.util.Map;
public class PublishClusterStateAction extends AbstractComponent {
public static interface NewClusterStateListener {
void onNewClusterState(ClusterState clusterState);
static interface NewStateProcessed {
void onNewClusterStateProcessed();
void onNewClusterStateFailed(Throwable t);
}
void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed);
}
private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final NewClusterStateListener listener;
private final TimeValue publishTimeout;
public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewClusterStateListener listener) {
super(settings);
@ -56,6 +69,8 @@ public class PublishClusterStateAction extends AbstractComponent {
this.nodesProvider = nodesProvider;
this.listener = listener;
this.publishTimeout = settings.getAsTime("discovery.zen.publish_timeout", TimeValue.timeValueSeconds(5));
transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler());
}
@ -67,9 +82,12 @@ public class PublishClusterStateAction extends AbstractComponent {
DiscoveryNode localNode = nodesProvider.nodes().localNode();
Map<Version, BytesReference> serializedStates = Maps.newHashMap();
final CountDownLatch latch = new CountDownLatch(clusterState.nodes().size());
for (final DiscoveryNode node : clusterState.nodes()) {
if (node.equals(localNode)) {
// no need to send to our self
latch.countDown();
continue;
}
// try and serialize the cluster state once (or per version), so we don't serialize it
@ -84,21 +102,49 @@ public class PublishClusterStateAction extends AbstractComponent {
stream.close();
bytes = bStream.bytes();
serializedStates.put(node.version(), bytes);
} catch (Exception e) {
logger.warn("failed to serialize cluster_state before publishing it to nodes", e);
return;
} catch (Throwable e) {
logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node);
latch.countDown();
continue;
}
}
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION,
new PublishClusterStateRequest(bytes, node.version()),
TransportRequestOptions.options().withHighType().withCompress(false), // no need to compress, we already compressed the bytes
try {
TransportRequestOptions options = TransportRequestOptions.options().withHighType().withCompress(false);
// no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION,
new PublishClusterStateRequest(bytes, node.version()),
options, // no need to compress, we already compressed the bytes
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}], should be detected as failed soon...", exp, node);
}
});
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
latch.countDown();
}
@Override
public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}]", exp, node);
latch.countDown();
}
});
} catch (Throwable t) {
latch.countDown();
}
}
if (publishTimeout.millis() > 0) {
// only wait if the publish timeout is configured...
try {
boolean awaited = latch.await(publishTimeout.millis(), TimeUnit.MILLISECONDS);
if (!awaited) {
logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout);
}
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
}
}
}
@ -139,7 +185,7 @@ public class PublishClusterStateAction extends AbstractComponent {
}
@Override
public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
public void messageReceived(PublishClusterStateRequest request, final TransportChannel channel) throws Exception {
Compressor compressor = CompressorFactory.compressor(request.clusterStateInBytes);
StreamInput in;
if (compressor != null) {
@ -149,9 +195,26 @@ public class PublishClusterStateAction extends AbstractComponent {
}
in.setVersion(request.version);
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
logger.debug("Received clusterstate version {}", clusterState.version());
listener.onNewClusterState(clusterState);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
logger.debug("received cluster state version {}", clusterState.version());
listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() {
@Override
public void onNewClusterStateProcessed() {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
}
}
@Override
public void onNewClusterStateFailed(Throwable t) {
try {
channel.sendResponse(t);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
}
}
});
}
@Override