mirror of
synced 2025-03-09 14:34:43 +00:00
Optimize multiple cluster state processing on receiving nodes
Nodes that receive the cluster state, and they have several of those pending, can optimize and try and process potentially only one of those. closes #5139
This commit is contained in:
@ -369,9 +369,9 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
newClusterState = builder.build();
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().masterNodeId());
} else if (newClusterState.version() < previousClusterState.version()) {
// we got this cluster state from the master, filter out based on versions (don't call listeners)
logger.debug("got old cluster state [" + newClusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring");
// we got a cluster state with older version, when we are *not* the master, let it in since it might be valid
// we check on version where applicable, like at ZenDiscovery#handleNewClusterStateFromMaster
logger.debug("got smaller cluster state when not master [" + newClusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "]");
@ -19,6 +19,7 @@
package org.elasticsearch.discovery.local;
import com.google.common.base.Objects;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
@ -306,6 +307,10 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() {
public ClusterState execute(ClusterState currentState) {
if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
return currentState;
ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState);
// if the routing table did not change, use the original one
if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {
@ -19,6 +19,7 @@
package org.elasticsearch.discovery.zen;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException;
@ -43,6 +44,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
@ -64,6 +66,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@ -527,8 +530,22 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
void handleNewClusterStateFromMaster(final ClusterState newState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
static class ProcessClusterState {
final ClusterState clusterState;
final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed;
volatile boolean processed;
ProcessClusterState(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
this.clusterState = clusterState;
this.newStateProcessed = newStateProcessed;
private final BlockingQueue<ProcessClusterState> processNewClusterStates = ConcurrentCollections.newBlockingQueue();
void handleNewClusterStateFromMaster(ClusterState newClusterState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
if (master) {
final ClusterState newState = newClusterState;
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
public ClusterState execute(ClusterState currentState) {
@ -560,17 +577,63 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
} else {
if (newState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newState.nodes().masterNode());
if (newClusterState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newClusterState.nodes().masterNode());
newStateProcessed.onNewClusterStateFailed(new ElasticsearchIllegalStateException("received state from a node that is not part of the cluster"));
} else {
if (currentJoinThread != null) {
logger.debug("got a new state from master node, though we are already trying to rejoin the cluster");
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
public ClusterState execute(ClusterState currentState) {
// we already processed it in a previous event
if (processClusterState.processed) {
return currentState;
// TODO: once improvement that we can do is change the message structure to include version and masterNodeId
// at the start, this will allow us to keep the "compressed bytes" around, and only parse the first page
// to figure out if we need to use it or not, and only once we picked the latest one, parse the whole state
// try and get the state with the highest version out of all the ones with the same master node id
ProcessClusterState stateToProcess = processNewClusterStates.poll();
if (stateToProcess == null) {
return currentState;
stateToProcess.processed = true;
while (true) {
ProcessClusterState potentialState = processNewClusterStates.peek();
// nothing else in the queue, bail
if (potentialState == null) {
// if its not from the same master, then bail
if (!Objects.equal(stateToProcess.clusterState.nodes().masterNodeId(), potentialState.clusterState.nodes().masterNodeId())) {
// we are going to use it for sure, poll (remove) it
potentialState = processNewClusterStates.poll();
potentialState.processed = true;
if (potentialState.clusterState.version() > stateToProcess.clusterState.version()) {
// we found a new one
stateToProcess = potentialState;
ClusterState updatedState = stateToProcess.clusterState;
// if the new state has a smaller version, and it has the same master node, then no need to process it
if (updatedState.version() < currentState.version() && Objects.equal(updatedState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
return currentState;
// we don't need to do this, since we ping the master, and get notified when it has moved from being a master
// because it doesn't have enough master nodes...
@ -578,25 +641,25 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// return disconnectFromCluster(newState, "not enough master nodes on new cluster state received from [" + newState.nodes().masterNode() + "]");
latestDiscoNodes = newState.nodes();
latestDiscoNodes = updatedState.nodes();
// check to see that we monitor the correct master of the cluster
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(latestDiscoNodes.masterNode())) {
masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
ClusterState.Builder builder = ClusterState.builder(newState);
ClusterState.Builder builder = ClusterState.builder(updatedState);
// if the routing table did not change, use the original one
if (newState.routingTable().version() == currentState.routingTable().version()) {
if (updatedState.routingTable().version() == currentState.routingTable().version()) {
// same for metadata
if (newState.metaData().version() == currentState.metaData().version()) {
if (updatedState.metaData().version() == currentState.metaData().version()) {
} else {
// if its not the same version, only copy over new indices or ones that changed the version
MetaData.Builder metaDataBuilder = MetaData.builder(newState.metaData()).removeAllIndices();
for (IndexMetaData indexMetaData : newState.metaData()) {
MetaData.Builder metaDataBuilder = MetaData.builder(updatedState.metaData()).removeAllIndices();
for (IndexMetaData indexMetaData : updatedState.metaData()) {
IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.index());
if (currentIndexMetaData == null || currentIndexMetaData.version() != indexMetaData.version()) {
metaDataBuilder.put(indexMetaData, false);
Reference in New Issue
Block a user