diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 5a237fd7e3c..a44914168e2 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -369,9 +369,9 @@ public class InternalClusterService extends AbstractLifecycleComponent implem discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { + if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) { + return currentState; + } + ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState); // if the routing table did not change, use the original one if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 92f59a44545..ca59bfad0bf 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen; +import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.elasticsearch.ElasticsearchException; @@ -43,6 +44,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.DiscoverySettings; @@ -64,6 +66,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -527,8 +530,22 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen }); } - void handleNewClusterStateFromMaster(final ClusterState newState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) { + static class ProcessClusterState { + final ClusterState clusterState; + final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed; + volatile boolean processed; + + ProcessClusterState(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) { + this.clusterState = clusterState; + this.newStateProcessed = newStateProcessed; + } + } + + private final BlockingQueue processNewClusterStates = ConcurrentCollections.newBlockingQueue(); + + void handleNewClusterStateFromMaster(ClusterState newClusterState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) { if (master) { + final ClusterState newState = newClusterState; clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -560,17 +577,63 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen }); } else { - if (newState.nodes().localNode() == null) { - logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newState.nodes().masterNode()); + if (newClusterState.nodes().localNode() == null) { + logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newClusterState.nodes().masterNode()); newStateProcessed.onNewClusterStateFailed(new ElasticsearchIllegalStateException("received state from a node that is not part of the cluster")); } else { if (currentJoinThread != null) { logger.debug("got a new state from master node, though we are already trying to rejoin the cluster"); } - clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed); + processNewClusterStates.add(processClusterState); + + clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { + // we already processed it in a previous event + if (processClusterState.processed) { + return currentState; + } + + // TODO: once improvement that we can do is change the message structure to include version and masterNodeId + // at the start, this will allow us to keep the "compressed bytes" around, and only parse the first page + // to figure out if we need to use it or not, and only once we picked the latest one, parse the whole state + + + // try and get the state with the highest version out of all the ones with the same master node id + ProcessClusterState stateToProcess = processNewClusterStates.poll(); + if (stateToProcess == null) { + return currentState; + } + stateToProcess.processed = true; + while (true) { + ProcessClusterState potentialState = processNewClusterStates.peek(); + // nothing else in the queue, bail + if (potentialState == null) { + break; + } + // if its not from the same master, then bail + if (!Objects.equal(stateToProcess.clusterState.nodes().masterNodeId(), potentialState.clusterState.nodes().masterNodeId())) { + break; + } + + // we are going to use it for sure, poll (remove) it + potentialState = processNewClusterStates.poll(); + potentialState.processed = true; + + if (potentialState.clusterState.version() > stateToProcess.clusterState.version()) { + // we found a new one + stateToProcess = potentialState; + } + } + + ClusterState updatedState = stateToProcess.clusterState; + + // if the new state has a smaller version, and it has the same master node, then no need to process it + if (updatedState.version() < currentState.version() && Objects.equal(updatedState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) { + return currentState; + } // we don't need to do this, since we ping the master, and get notified when it has moved from being a master // because it doesn't have enough master nodes... @@ -578,25 +641,25 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen // return disconnectFromCluster(newState, "not enough master nodes on new cluster state received from [" + newState.nodes().masterNode() + "]"); //} - latestDiscoNodes = newState.nodes(); + latestDiscoNodes = updatedState.nodes(); // check to see that we monitor the correct master of the cluster if (masterFD.masterNode() == null || !masterFD.masterNode().equals(latestDiscoNodes.masterNode())) { masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); } - ClusterState.Builder builder = ClusterState.builder(newState); + ClusterState.Builder builder = ClusterState.builder(updatedState); // if the routing table did not change, use the original one - if (newState.routingTable().version() == currentState.routingTable().version()) { + if (updatedState.routingTable().version() == currentState.routingTable().version()) { builder.routingTable(currentState.routingTable()); } // same for metadata - if (newState.metaData().version() == currentState.metaData().version()) { + if (updatedState.metaData().version() == currentState.metaData().version()) { builder.metaData(currentState.metaData()); } else { // if its not the same version, only copy over new indices or ones that changed the version - MetaData.Builder metaDataBuilder = MetaData.builder(newState.metaData()).removeAllIndices(); - for (IndexMetaData indexMetaData : newState.metaData()) { + MetaData.Builder metaDataBuilder = MetaData.builder(updatedState.metaData()).removeAllIndices(); + for (IndexMetaData indexMetaData : updatedState.metaData()) { IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.index()); if (currentIndexMetaData == null || currentIndexMetaData.version() != indexMetaData.version()) { metaDataBuilder.put(indexMetaData, false);