From 8601529be8f92954b05f982a1d5f6b9e2025bace Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 17 Feb 2014 11:10:49 +0100 Subject: [PATCH] Optimize multiple cluster state processing on receiving nodes Nodes that receive the cluster state, and they have several of those pending, can optimize and try and process potentially only one of those. closes #5139 --- .../service/InternalClusterService.java | 6 +- .../discovery/local/LocalDiscovery.java | 5 ++ .../discovery/zen/ZenDiscovery.java | 83 ++++++++++++++++--- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 5a237fd7e3c..a44914168e2 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -369,9 +369,9 @@ public class InternalClusterService extends AbstractLifecycleComponent implem discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { + if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) { + return currentState; + } + ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState); // if the routing table did not change, use the original one if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 92f59a44545..ca59bfad0bf 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen; +import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.elasticsearch.ElasticsearchException; @@ -43,6 +44,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.DiscoverySettings; @@ -64,6 +66,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -527,8 +530,22 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen }); } - void handleNewClusterStateFromMaster(final ClusterState newState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) { + static class ProcessClusterState { + final ClusterState clusterState; + final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed; + volatile boolean processed; + + ProcessClusterState(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) { + this.clusterState = clusterState; + this.newStateProcessed = newStateProcessed; + } + } + + private final BlockingQueue processNewClusterStates = ConcurrentCollections.newBlockingQueue(); + + void handleNewClusterStateFromMaster(ClusterState newClusterState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) { if (master) { + final ClusterState newState = newClusterState; clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -560,17 +577,63 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen }); } else { - if (newState.nodes().localNode() == null) { - logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newState.nodes().masterNode()); + if (newClusterState.nodes().localNode() == null) { + logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newClusterState.nodes().masterNode()); newStateProcessed.onNewClusterStateFailed(new ElasticsearchIllegalStateException("received state from a node that is not part of the cluster")); } else { if (currentJoinThread != null) { logger.debug("got a new state from master node, though we are already trying to rejoin the cluster"); } - clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed); + processNewClusterStates.add(processClusterState); + + clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { + // we already processed it in a previous event + if (processClusterState.processed) { + return currentState; + } + + // TODO: once improvement that we can do is change the message structure to include version and masterNodeId + // at the start, this will allow us to keep the "compressed bytes" around, and only parse the first page + // to figure out if we need to use it or not, and only once we picked the latest one, parse the whole state + + + // try and get the state with the highest version out of all the ones with the same master node id + ProcessClusterState stateToProcess = processNewClusterStates.poll(); + if (stateToProcess == null) { + return currentState; + } + stateToProcess.processed = true; + while (true) { + ProcessClusterState potentialState = processNewClusterStates.peek(); + // nothing else in the queue, bail + if (potentialState == null) { + break; + } + // if its not from the same master, then bail + if (!Objects.equal(stateToProcess.clusterState.nodes().masterNodeId(), potentialState.clusterState.nodes().masterNodeId())) { + break; + } + + // we are going to use it for sure, poll (remove) it + potentialState = processNewClusterStates.poll(); + potentialState.processed = true; + + if (potentialState.clusterState.version() > stateToProcess.clusterState.version()) { + // we found a new one + stateToProcess = potentialState; + } + } + + ClusterState updatedState = stateToProcess.clusterState; + + // if the new state has a smaller version, and it has the same master node, then no need to process it + if (updatedState.version() < currentState.version() && Objects.equal(updatedState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) { + return currentState; + } // we don't need to do this, since we ping the master, and get notified when it has moved from being a master // because it doesn't have enough master nodes... @@ -578,25 +641,25 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen // return disconnectFromCluster(newState, "not enough master nodes on new cluster state received from [" + newState.nodes().masterNode() + "]"); //} - latestDiscoNodes = newState.nodes(); + latestDiscoNodes = updatedState.nodes(); // check to see that we monitor the correct master of the cluster if (masterFD.masterNode() == null || !masterFD.masterNode().equals(latestDiscoNodes.masterNode())) { masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); } - ClusterState.Builder builder = ClusterState.builder(newState); + ClusterState.Builder builder = ClusterState.builder(updatedState); // if the routing table did not change, use the original one - if (newState.routingTable().version() == currentState.routingTable().version()) { + if (updatedState.routingTable().version() == currentState.routingTable().version()) { builder.routingTable(currentState.routingTable()); } // same for metadata - if (newState.metaData().version() == currentState.metaData().version()) { + if (updatedState.metaData().version() == currentState.metaData().version()) { builder.metaData(currentState.metaData()); } else { // if its not the same version, only copy over new indices or ones that changed the version - MetaData.Builder metaDataBuilder = MetaData.builder(newState.metaData()).removeAllIndices(); - for (IndexMetaData indexMetaData : newState.metaData()) { + MetaData.Builder metaDataBuilder = MetaData.builder(updatedState.metaData()).removeAllIndices(); + for (IndexMetaData indexMetaData : updatedState.metaData()) { IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.index()); if (currentIndexMetaData == null || currentIndexMetaData.version() != indexMetaData.version()) { metaDataBuilder.put(indexMetaData, false);