diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java index 5b05f199dfa..21962fb421b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -52,10 +52,7 @@ import org.elasticsearch.discovery.local.LocalDiscovery; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import java.io.IOException; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; +import java.util.*; /** * Represents the current state of the cluster. @@ -296,6 +293,16 @@ public class ClusterState implements ToXContent, Diffable { } } + /** + * a cluster state supersedes another state iff they are from the same master and the version this state is higher thant the other state. + *

+ * In essence that means that all the changes from the other cluster state are also reflected by the current one + */ + public boolean supersedes(ClusterState other) { + return this.nodes().masterNodeId() != null && this.nodes().masterNodeId().equals(other.nodes().masterNodeId()) && this.version() > other.version(); + + } + public enum Metric { VERSION("version"), MASTER_NODE("master_node"), @@ -814,6 +821,7 @@ public class ClusterState implements ToXContent, Diffable { builder.fromDiff(true); return builder.build(); } + } } diff --git a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index a8d82ccb84b..91c2bf22c1e 100644 --- a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -357,7 +357,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) { + if (currentState.supersedes(nodeSpecificClusterState)) { return currentState; } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index a38fef4cc71..3887ee4256a 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -19,18 +19,11 @@ package org.elasticsearch.discovery.zen; -import com.google.common.base.Objects; import com.google.common.collect.Sets; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -49,7 +42,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.InitialStateDiscoveryListener; @@ -64,20 +56,12 @@ import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -199,7 +183,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName); this.nodesFD.addListener(new NodeFaultDetectionListener()); - this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName); + this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewPendingClusterStateListener(), discoverySettings, clusterName); this.pingService.setPingContextProvider(this); this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener()); @@ -358,6 +342,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return joinThreadControl.joinThreadActive(); } + + // used for testing + public ClusterState[] pendingClusterStates() { + return publishClusterState.pendingStatesQueue().pendingClusterStates(); + } + /** * the main function of a join thread. This function is guaranteed to join the cluster * or spawn a new join thread upon failure to do so. @@ -428,7 +418,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join"); } - // Note: we do not have to start master fault detection here because it's set at {@link #handleNewClusterStateFromMaster } + // Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState } // when the first cluster state arrives. joinThreadControl.markThreadAsDone(currentThread); return currentState; @@ -634,9 +624,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen .masterNodeId(null).build(); // flush any pending cluster states from old master, so it will not be set as master again - ArrayList pendingNewClusterStates = new ArrayList<>(); - processNewClusterStates.drainTo(pendingNewClusterStates); - logger.trace("removed [{}] pending cluster states", pendingNewClusterStates.size()); + publishClusterState.pendingStatesQueue().failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason)); if (rejoinOnMasterGone) { return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "master left (reason = " + reason + ")"); @@ -682,171 +670,98 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen }); } - static class ProcessClusterState { - final ClusterState clusterState; - volatile boolean processed; + void processNextPendingClusterState(String reason) { + clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { + ClusterState newClusterState = null; - ProcessClusterState(ClusterState clusterState) { - this.clusterState = clusterState; - } - } + @Override + public ClusterState execute(ClusterState currentState) { + newClusterState = publishClusterState.pendingStatesQueue().getNextClusterStateToProcess(); - private final BlockingQueue processNewClusterStates = ConcurrentCollections.newBlockingQueue(); - - void handleNewClusterStateFromMaster(ClusterState newClusterState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) { - final ClusterName incomingClusterName = newClusterState.getClusterName(); - if (localNodeMaster()) { - logger.debug("received cluster state from [{}] which is also master with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName); - final ClusterState newState = newClusterState; - clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return handleAnotherMaster(currentState, newState.nodes().masterNode(), newState.version(), "via a new cluster state"); + // all pending states have been processed + if (newClusterState == null) { + return currentState; } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - newStateProcessed.onNewClusterStateProcessed(); + assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master"; + assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block"; + + if (currentState.nodes().localNodeMaster()) { + return handleAnotherMaster(currentState, newClusterState.nodes().masterNode(), newClusterState.version(), "via a new cluster state"); } - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - newStateProcessed.onNewClusterStateFailed(t); + if (shouldIgnoreOrRejectNewClusterState(logger, currentState, newClusterState)) { + return currentState; } - }); - } else { + // check to see that we monitor the correct master of the cluster + if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().masterNode())) { + masterFD.restart(newClusterState.nodes().masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); + } + + if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { + // its a fresh update from the master as we transition from a start of not having a master to having one + logger.debug("got first state from fresh master [{}]", newClusterState.nodes().masterNodeId()); + long count = clusterJoinsCounter.incrementAndGet(); + logger.trace("updated cluster join cluster to [{}]", count); + + return newClusterState; + } - final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState); - processNewClusterStates.add(processClusterState); + // some optimizations to make sure we keep old objects where possible + ClusterState.Builder builder = ClusterState.builder(newClusterState); - assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master"; - assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block"; - - clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - // we already processed it in a previous event - if (processClusterState.processed) { - return currentState; - } - - // TODO: once improvement that we can do is change the message structure to include version and masterNodeId - // at the start, this will allow us to keep the "compressed bytes" around, and only parse the first page - // to figure out if we need to use it or not, and only once we picked the latest one, parse the whole state - - - ClusterState updatedState = selectNextStateToProcess(processNewClusterStates); - if (updatedState == null) { - updatedState = currentState; - } - if (shouldIgnoreOrRejectNewClusterState(logger, currentState, updatedState)) { - return currentState; - } - - // we don't need to do this, since we ping the master, and get notified when it has moved from being a master - // because it doesn't have enough master nodes... - //if (!electMaster.hasEnoughMasterNodes(newState.nodes())) { - // return disconnectFromCluster(newState, "not enough master nodes on new cluster state wreceived from [" + newState.nodes().masterNode() + "]"); - //} - - // check to see that we monitor the correct master of the cluster - if (masterFD.masterNode() == null || !masterFD.masterNode().equals(updatedState.nodes().masterNode())) { - masterFD.restart(updatedState.nodes().masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]"); - } - - if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { - // its a fresh update from the master as we transition from a start of not having a master to having one - logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId()); - long count = clusterJoinsCounter.incrementAndGet(); - logger.trace("updated cluster join cluster to [{}]", count); - - return updatedState; - } - - - // some optimizations to make sure we keep old objects where possible - ClusterState.Builder builder = ClusterState.builder(updatedState); - - // if the routing table did not change, use the original one - if (updatedState.routingTable().version() == currentState.routingTable().version()) { - builder.routingTable(currentState.routingTable()); - } - // same for metadata - if (updatedState.metaData().version() == currentState.metaData().version()) { - builder.metaData(currentState.metaData()); - } else { - // if its not the same version, only copy over new indices or ones that changed the version - MetaData.Builder metaDataBuilder = MetaData.builder(updatedState.metaData()).removeAllIndices(); - for (IndexMetaData indexMetaData : updatedState.metaData()) { - IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.index()); - if (currentIndexMetaData != null && currentIndexMetaData.isSameUUID(indexMetaData.indexUUID()) && - currentIndexMetaData.version() == indexMetaData.version()) { - // safe to reuse - metaDataBuilder.put(currentIndexMetaData, false); - } else { - metaDataBuilder.put(indexMetaData, false); - } + // if the routing table did not change, use the original one + if (newClusterState.routingTable().version() == currentState.routingTable().version()) { + builder.routingTable(currentState.routingTable()); + } + // same for metadata + if (newClusterState.metaData().version() == currentState.metaData().version()) { + builder.metaData(currentState.metaData()); + } else { + // if its not the same version, only copy over new indices or ones that changed the version + MetaData.Builder metaDataBuilder = MetaData.builder(newClusterState.metaData()).removeAllIndices(); + for (IndexMetaData indexMetaData : newClusterState.metaData()) { + IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.index()); + if (currentIndexMetaData != null && currentIndexMetaData.isSameUUID(indexMetaData.indexUUID()) && + currentIndexMetaData.version() == indexMetaData.version()) { + // safe to reuse + metaDataBuilder.put(currentIndexMetaData, false); + } else { + metaDataBuilder.put(indexMetaData, false); } - builder.metaData(metaDataBuilder); } - - return builder.build(); + builder.metaData(metaDataBuilder); } - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - newStateProcessed.onNewClusterStateFailed(t); - } + return builder.build(); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + @Override + public void onFailure(String source, Throwable t) { + logger.error("unexpected failure during [{}]", t, source); + if (newClusterState != null) { + try { + publishClusterState.pendingStatesQueue().markAsFailed(newClusterState, t); + } catch (Throwable unexpected) { + logger.error("unexpected exception while failing [{}]", unexpected, source); + } + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + try { sendInitialStateEventIfNeeded(); - newStateProcessed.onNewClusterStateProcessed(); + if (newClusterState != null) { + publishClusterState.pendingStatesQueue().markAsProcessed(newClusterState); + } + } catch (Throwable t) { + onFailure(source, t); } - }); - } - } - - /** - * Picks the cluster state with highest version with the same master from the queue. All cluster states with - * lower versions are ignored. If a cluster state with a different master is seen the processing logic stops and the - * last processed state is returned. - */ - static ClusterState selectNextStateToProcess(Queue processNewClusterStates) { - // try and get the state with the highest version out of all the ones with the same master node id - ProcessClusterState stateToProcess = processNewClusterStates.poll(); - if (stateToProcess == null) { - return null; - } - stateToProcess.processed = true; - while (true) { - ProcessClusterState potentialState = processNewClusterStates.peek(); - // nothing else in the queue, bail - if (potentialState == null) { - break; } - // if its not from the same master, then bail - if (!Objects.equal(stateToProcess.clusterState.nodes().masterNodeId(), potentialState.clusterState.nodes().masterNodeId())) { - break; - } - // we are going to use it for sure, poll (remove) it - potentialState = processNewClusterStates.poll(); - if (potentialState == null) { - // might happen if the queue is drained - break; - } - potentialState.processed = true; - - if (potentialState.clusterState.version() > stateToProcess.clusterState.version()) { - // we found a new one - stateToProcess = potentialState; - } - } - return stateToProcess.clusterState; + }); } /** @@ -857,7 +772,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen */ static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) { validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState); - if (currentState.nodes().masterNodeId() != null && newClusterState.version() < currentState.version()) { + if (currentState.supersedes(newClusterState)) { // if the new state has a smaller version, and it has the same master node, then no need to process it logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version()); return true; @@ -1073,11 +988,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } } - private class NewClusterStateListener implements PublishClusterStateAction.NewClusterStateListener { + private class NewPendingClusterStateListener implements PublishClusterStateAction.NewPendingClusterStateListener { @Override - public void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed) { - handleNewClusterStateFromMaster(clusterState, newStateProcessed); + public void onNewClusterState(String reason) { + processNextPendingClusterState(reason); } } @@ -1111,11 +1026,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return; } - // nodes pre 1.4.0 do not send this information - if (pingRequest.masterNode() == null) { - return; - } - if (pingsWhileMaster.incrementAndGet() < maxPingsFromAnotherMaster) { logger.trace("got a ping from another master {}. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get()); return; diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java new file mode 100644 index 00000000000..fc894f3d07e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java @@ -0,0 +1,286 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.discovery.zen.publish; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; + +import java.util.ArrayList; +import java.util.Locale; +import java.util.Objects; + +/** + * A queue that holds all "in-flight" incoming cluster states from the master. Once a master commits a cluster + * state, it is made available via {@link #getNextClusterStateToProcess()}. The class also takes care of batching + * cluster states for processing and failures. + *

+ * The queue is bound by {@link #maxQueueSize}. When the queue is at capacity and a new cluster state is inserted + * the oldest cluster state will be dropped. This is safe because: + * 1) Under normal operations, master will publish & commit a cluster state before processing another change (i.e., the queue length is 1) + * 2) If the master fails to commit a change, it will step down, causing a master election, which will flush the queue. + * 3) In general it's safe to process the incoming cluster state as a replacement to the cluster state that's dropped. + * a) If the dropped cluster is from the same master as the incoming one is, it is likely to be superseded by the incoming state (or another state in the queue). + * This is only not true in very extreme cases of out of order delivery. + * b) If the dropping cluster state is not from the same master, it means that: + * i) we are no longer following the master of the dropped cluster state but follow the incoming one + * ii) we are no longer following any master, in which case it doesn't matter which cluster state will be processed first. + *

+ * The class is fully thread safe and can be used concurrently. + */ +public class PendingClusterStatesQueue { + + interface StateProcessedListener { + + void onNewClusterStateProcessed(); + + void onNewClusterStateFailed(Throwable t); + } + + final ArrayList pendingStates = new ArrayList<>(); + final ESLogger logger; + final int maxQueueSize; + + public PendingClusterStatesQueue(ESLogger logger, int maxQueueSize) { + this.logger = logger; + this.maxQueueSize = maxQueueSize; + } + + /** Add an incoming, not yet committed cluster state */ + public synchronized void addPending(ClusterState state) { + pendingStates.add(new ClusterStateContext(state)); + if (pendingStates.size() > maxQueueSize) { + ClusterStateContext context = pendingStates.remove(0); + logger.warn("dropping pending state [{}]. more than [{}] pending states.", context, maxQueueSize); + if (context.committed()) { + context.listener.onNewClusterStateFailed(new ElasticsearchException("too many pending states ([{}] pending)", maxQueueSize)); + } + } + } + + /** + * Mark a previously added cluster state as committed. This will make it available via {@link #getNextClusterStateToProcess()} + * When the cluster state is processed (or failed), the supplied listener will be called + **/ + public synchronized ClusterState markAsCommitted(String stateUUID, StateProcessedListener listener) { + final ClusterStateContext context = findState(stateUUID); + if (context == null) { + listener.onNewClusterStateFailed(new IllegalStateException("can't resolve cluster state with uuid [" + stateUUID + "] to commit")); + return null; + } + if (context.committed()) { + listener.onNewClusterStateFailed(new IllegalStateException("cluster state with uuid [" + stateUUID + "] is already committed")); + return null; + } + context.markAsCommitted(listener); + return context.state; + } + + /** + * mark that the processing of the given state has failed. All committed states that are {@link ClusterState#supersedes(ClusterState)}-ed + * by this failed state, will be failed as well + */ + public synchronized void markAsFailed(ClusterState state, Throwable reason) { + final ClusterStateContext failedContext = findState(state.stateUUID()); + if (failedContext == null) { + throw new IllegalArgumentException("can't resolve failed cluster state with uuid [" + state.stateUUID() + "], version [" + state.version() + "]"); + } + if (failedContext.committed() == false) { + throw new IllegalArgumentException("failed cluster state is not committed " + state); + } + + // fail all committed states which are batch together with the failed state + ArrayList statesToRemove = new ArrayList<>(); + for (int index = 0; index < pendingStates.size(); index++) { + final ClusterStateContext pendingContext = pendingStates.get(index); + if (pendingContext.committed() == false) { + continue; + } + final ClusterState pendingState = pendingContext.state; + if (pendingContext.equals(failedContext)) { + statesToRemove.add(pendingContext); + pendingContext.listener.onNewClusterStateFailed(reason); + } else if (state.supersedes(pendingState)) { + statesToRemove.add(pendingContext); + logger.debug("failing committed state {} together with state {}", pendingContext, failedContext); + pendingContext.listener.onNewClusterStateFailed(reason); + } + } + pendingStates.removeAll(statesToRemove); + assert findState(state.stateUUID()) == null : "state was marked as processed but can still be found in pending list " + state; + } + + /** + * indicates that a cluster state was successfully processed. Any committed state that is {@link ClusterState#supersedes(ClusterState)}-ed + * by the processed state will be marked as processed as well. + *

+ * NOTE: successfully processing a state indicates we are following the master it came from. Any committed state from another master will + * be failed by this method + */ + public synchronized void markAsProcessed(ClusterState state) { + if (findState(state.stateUUID()) == null) { + throw new IllegalStateException("can't resolve processed cluster state with uuid [" + state.stateUUID() + "], version [" + state.version() + "]"); + } + final DiscoveryNode currentMaster = state.nodes().masterNode(); + assert currentMaster != null : "processed cluster state mast have a master. " + state; + + // fail or remove any incoming state from a different master + // respond to any committed state from the same master with same or lower version (we processed a higher version) + ArrayList contextsToRemove = new ArrayList<>(); + for (int index = 0; index < pendingStates.size(); index++) { + final ClusterStateContext pendingContext = pendingStates.get(index); + final ClusterState pendingState = pendingContext.state; + final DiscoveryNode pendingMasterNode = pendingState.nodes().masterNode(); + if (Objects.equals(currentMaster, pendingMasterNode) == false) { + contextsToRemove.add(pendingContext); + if (pendingContext.committed()) { + // this is a committed state , warn + logger.warn("received a cluster state (uuid[{}]/v[{}]) from a different master than the current one, rejecting (received {}, current {})", + pendingState.stateUUID(), pendingState.version(), + pendingMasterNode, currentMaster); + pendingContext.listener.onNewClusterStateFailed( + new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + pendingMasterNode + ", current " + currentMaster + ")") + ); + } else { + logger.trace("removing non-committed state with uuid[{}]/v[{}] from [{}] - a state from [{}] was successfully processed", + pendingState.stateUUID(), pendingState.version(), pendingMasterNode, + currentMaster + ); + } + } else if (state.supersedes(pendingState) && pendingContext.committed()) { + logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]", + pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version() + ); + contextsToRemove.add(pendingContext); + pendingContext.listener.onNewClusterStateProcessed(); + } else if (pendingState.stateUUID().equals(state.stateUUID())) { + assert pendingContext.committed() : "processed cluster state is not committed " + state; + contextsToRemove.add(pendingContext); + pendingContext.listener.onNewClusterStateProcessed(); + } + } + // now ack the processed state + pendingStates.removeAll(contextsToRemove); + assert findState(state.stateUUID()) == null : "state was marked as processed but can still be found in pending list " + state; + + } + + ClusterStateContext findState(String stateUUID) { + for (int i = 0; i < pendingStates.size(); i++) { + final ClusterStateContext context = pendingStates.get(i); + if (context.stateUUID().equals(stateUUID)) { + return context; + } + } + return null; + } + + /** clear the incoming queue. any committed state will be failed */ + public synchronized void failAllStatesAndClear(Throwable reason) { + for (ClusterStateContext pendingState : pendingStates) { + if (pendingState.committed()) { + pendingState.listener.onNewClusterStateFailed(reason); + } + } + pendingStates.clear(); + } + + /** + * Gets the next committed state to process. + *

+ * The method tries to batch operation by getting the cluster state the highest possible committed states + * which succeeds the first committed state in queue (i.e., it comes from the same master). + */ + public synchronized ClusterState getNextClusterStateToProcess() { + if (pendingStates.isEmpty()) { + return null; + } + + ClusterStateContext stateToProcess = null; + int index = 0; + for (; index < pendingStates.size(); index++) { + ClusterStateContext potentialState = pendingStates.get(index); + if (potentialState.committed()) { + stateToProcess = potentialState; + break; + } + } + if (stateToProcess == null) { + return null; + } + + // now try to find the highest committed state from the same master + for (; index < pendingStates.size(); index++) { + ClusterStateContext potentialState = pendingStates.get(index); + + if (potentialState.state.supersedes(stateToProcess.state) && potentialState.committed()) { + // we found a new one + stateToProcess = potentialState; + } + } + assert stateToProcess.committed() : "should only return committed cluster state. found " + stateToProcess.state; + return stateToProcess.state; + } + + /** returns all pending states, committed or not */ + public synchronized ClusterState[] pendingClusterStates() { + ArrayList states = new ArrayList<>(); + for (ClusterStateContext context : pendingStates) { + states.add(context.state); + } + return states.toArray(new ClusterState[states.size()]); + } + + static class ClusterStateContext { + final ClusterState state; + StateProcessedListener listener; + + ClusterStateContext(ClusterState clusterState) { + this.state = clusterState; + } + + void markAsCommitted(StateProcessedListener listener) { + if (this.listener != null) { + throw new IllegalStateException(toString() + "is already committed"); + } + this.listener = listener; + } + + boolean committed() { + return listener != null; + } + + public String stateUUID() { + return state.stateUUID(); + } + + @Override + public String toString() { + return String.format( + Locale.ROOT, + "[uuid[%s], v[%d], m[%s]]", + stateUUID(), + state.version(), + state.nodes().masterNodeId() + ); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index c9fe8d688ff..4da6e42601b 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -57,32 +57,30 @@ public class PublishClusterStateAction extends AbstractComponent { public static final String SEND_ACTION_NAME = "internal:discovery/zen/publish/send"; public static final String COMMIT_ACTION_NAME = "internal:discovery/zen/publish/commit"; - public interface NewClusterStateListener { + public static final String SETTINGS_MAX_PENDING_CLUSTER_STATES = "discovery.zen.publish.max_pending_cluster_states"; - interface NewStateProcessed { + public interface NewPendingClusterStateListener { - void onNewClusterStateProcessed(); - - void onNewClusterStateFailed(Throwable t); - } - - void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed); + /** a new cluster state has been committed and is ready to process via {@link #pendingStatesQueue()} */ + void onNewClusterState(String reason); } private final TransportService transportService; private final DiscoveryNodesProvider nodesProvider; - private final NewClusterStateListener listener; + private final NewPendingClusterStateListener newPendingClusterStatelistener; private final DiscoverySettings discoverySettings; private final ClusterName clusterName; + private final PendingClusterStatesQueue pendingStatesQueue; public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, - NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { + NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { super(settings); this.transportService = transportService; this.nodesProvider = nodesProvider; - this.listener = listener; + this.newPendingClusterStatelistener = listener; this.discoverySettings = discoverySettings; this.clusterName = clusterName; + this.pendingStatesQueue = new PendingClusterStatesQueue(logger, settings.getAsInt(SETTINGS_MAX_PENDING_CLUSTER_STATES, 25)); transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest.class, ThreadPool.Names.SAME, new SendClusterStateRequestHandler()); transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest.class, ThreadPool.Names.SAME, new CommitClusterStateRequestHandler()); } @@ -92,6 +90,10 @@ public class PublishClusterStateAction extends AbstractComponent { transportService.removeHandler(COMMIT_ACTION_NAME); } + public PendingClusterStatesQueue pendingStatesQueue() { + return pendingStatesQueue; + } + /** * publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will * be processed by the master and the other nodes. @@ -359,6 +361,7 @@ public class PublishClusterStateAction extends AbstractComponent { // sanity check incoming state validateIncomingState(incomingState, lastSeenClusterState); + pendingStatesQueue.addPending(incomingState); lastSeenClusterState = incomingState; lastSeenClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); } @@ -382,56 +385,34 @@ public class PublishClusterStateAction extends AbstractComponent { logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().masterNode()); throw new IllegalStateException("received state from a node that is not part of the cluster"); } - // state from another master requires more subtle checks, so we let it pass for now (it will be checked in ZenDiscovery) - if (currentNodes.localNodeMaster() == false) { - ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState); - } - if (lastSeenClusterState != null - && Objects.equals(lastSeenClusterState.nodes().masterNodeId(), incomingState.nodes().masterNodeId()) - && lastSeenClusterState.version() > incomingState.version()) { - logger.debug("received an older cluster state from master, rejecting (received version [{}], last version is [{}])", - incomingState.version(), lastSeenClusterState.version()); - throw new IllegalStateException("cluster state version [" + incomingState.version() + "] is old (last seen version [" + lastSeenClusterState.version() + "])"); - } + ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState); } protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) { - ClusterState committedClusterState; - synchronized (lastSeenClusterStateMutex) { - committedClusterState = lastSeenClusterState; - } - - // if this message somehow comes without a previous send, we won't have a cluster state - String lastSeenUUID = committedClusterState == null ? null : committedClusterState.stateUUID(); - if (request.stateUUID.equals(lastSeenUUID) == false) { - throw new IllegalStateException("tried to commit cluster state UUID [" + request.stateUUID + "], but last seen UUID is [" + lastSeenUUID + "]"); - } - - try { - listener.onNewClusterState(committedClusterState, new NewClusterStateListener.NewStateProcessed() { - @Override - public void onNewClusterStateProcessed() { - try { - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } catch (Throwable e) { - logger.debug("failed to send response on cluster state processed", e); - onNewClusterStateFailed(e); - } + final ClusterState state = pendingStatesQueue.markAsCommitted(request.stateUUID, new PendingClusterStatesQueue.StateProcessedListener() { + @Override + public void onNewClusterStateProcessed() { + try { + // send a response to the master to indicate that this cluster state has been processed post committing it. + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (Throwable e) { + logger.debug("failed to send response on cluster state processed", e); + onNewClusterStateFailed(e); } + } - @Override - public void onNewClusterStateFailed(Throwable t) { - try { - channel.sendResponse(t); - } catch (Throwable e) { - logger.debug("failed to send response on cluster state processed", e); - } + @Override + public void onNewClusterStateFailed(Throwable t) { + try { + channel.sendResponse(t); + } catch (Throwable e) { + logger.debug("failed to send response on cluster state processed", e); } - }); - } catch (Exception e) { - logger.warn("unexpected error while processing cluster state version [{}]", e, lastSeenClusterState.version()); - throw e; + } + }); + if (state != null) { + newPendingClusterStatelistener.onNewClusterState("master " + state.nodes().masterNode() + " committed version [" + state.version() + "]"); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java new file mode 100644 index 00000000000..19f90f2962c --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster; + +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class ClusterStateTests extends ESTestCase { + + public void testSupersedes() { + final DiscoveryNode node1 = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT); + final DiscoveryNode node2 = new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT); + final DiscoveryNodes nodes = DiscoveryNodes.builder().put(node1).put(node2).build(); + ClusterState noMaster1 = ClusterState.builder(ClusterName.DEFAULT).version(randomInt(5)).nodes(nodes).build(); + ClusterState noMaster2 = ClusterState.builder(ClusterName.DEFAULT).version(randomInt(5)).nodes(nodes).build(); + ClusterState withMaster1a = ClusterState.builder(ClusterName.DEFAULT).version(randomInt(5)).nodes(DiscoveryNodes.builder(nodes).masterNodeId(node1.id())).build(); + ClusterState withMaster1b = ClusterState.builder(ClusterName.DEFAULT).version(randomInt(5)).nodes(DiscoveryNodes.builder(nodes).masterNodeId(node1.id())).build(); + ClusterState withMaster2 = ClusterState.builder(ClusterName.DEFAULT).version(randomInt(5)).nodes(DiscoveryNodes.builder(nodes).masterNodeId(node2.id())).build(); + + // states with no master should never supersede anything + assertFalse(noMaster1.supersedes(noMaster2)); + assertFalse(noMaster1.supersedes(withMaster1a)); + + // states should never supersede states from another master + assertFalse(withMaster1a.supersedes(withMaster2)); + assertFalse(withMaster1a.supersedes(noMaster1)); + + // state from the same master compare by version + assertThat(withMaster1a.supersedes(withMaster1b), equalTo(withMaster1a.version() > withMaster1b.version())); + + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java b/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java index 84b31e3be30..987b8295310 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ack/AckIT.java @@ -19,6 +19,9 @@ package org.elasticsearch.cluster.ack; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; @@ -34,8 +37,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.settings.Settings; @@ -44,9 +47,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.warmer.IndexWarmersMetaData; import org.elasticsearch.test.ESIntegTestCase; import org.junit.Test; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; import java.util.concurrent.TimeUnit; @@ -72,7 +72,7 @@ public class AckIT extends ESIntegTestCase { createIndex("test"); assertAcked(client().admin().indices().prepareUpdateSettings("test") - .setSettings(Settings.builder().put("refresh_interval", 9999, TimeUnit.MILLISECONDS))); + .setSettings(Settings.builder().put("refresh_interval", 9999, TimeUnit.MILLISECONDS))); for (Client client : clients()) { String refreshInterval = getLocalClusterState(client).metaData().index("test").settings().get("index.refresh_interval"); @@ -178,9 +178,9 @@ public class AckIT extends ESIntegTestCase { @Test public void testClusterRerouteAcknowledgement() throws InterruptedException { assertAcked(prepareCreate("test").setSettings(Settings.builder() - .put(indexSettings()) - .put(SETTING_NUMBER_OF_SHARDS, between(cluster().numDataNodes(), DEFAULT_MAX_NUM_SHARDS)) - .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(indexSettings()) + .put(SETTING_NUMBER_OF_SHARDS, between(cluster().numDataNodes(), DEFAULT_MAX_NUM_SHARDS)) + .put(SETTING_NUMBER_OF_REPLICAS, 0) )); ensureGreen(); diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 6d4b0a9ee45..e3c97a74f67 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -22,8 +22,6 @@ package org.elasticsearch.discovery; import com.google.common.base.Predicate; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; @@ -54,7 +52,11 @@ import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration; import org.elasticsearch.test.disruption.*; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -734,7 +736,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { */ @Test public void unicastSinglePingResponseContainsMaster() throws Exception { - List nodes = startCluster(4, -1, new int[] {0}); + List nodes = startCluster(4, -1, new int[]{0}); // Figure out what is the elected master node final String masterNode = internalCluster().getMasterName(); logger.info("---> legit elected master node=" + masterNode); @@ -853,6 +855,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { nonMasterTransportService.clearRule(discoveryNodes.masterNode()); ensureStableCluster(2); + + // shutting down the nodes, to avoid the leakage check tripping + // on the states associated with the commit requests we may have dropped + internalCluster().stopRandomNonMasterNode(); } @@ -943,7 +949,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { @Test public void testIndicesDeleted() throws Exception { configureUnicastCluster(3, null, 2); - Future> masterNodes= internalCluster().startMasterOnlyNodesAsync(2); + Future> masterNodes = internalCluster().startMasterOnlyNodesAsync(2); Future dataNode = internalCluster().startDataOnlyNodeAsync(); dataNode.get(); masterNodes.get(); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java index 61b1062dbed..707ff87960e 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java @@ -27,14 +27,8 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.test.ESTestCase; -import java.util.Collections; -import java.util.LinkedList; -import java.util.Queue; - -import static org.elasticsearch.discovery.zen.ZenDiscovery.ProcessClusterState; import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState; -import static org.hamcrest.Matchers.*; -import static org.hamcrest.core.IsNull.nullValue; +import static org.hamcrest.Matchers.containsString; /** */ @@ -95,53 +89,4 @@ public class ZenDiscoveryUnitTest extends ESTestCase { } assertFalse("should not ignore, because current state doesn't have a master", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build())); } - - public void testSelectNextStateToProcess_empty() { - Queue queue = new LinkedList<>(); - assertThat(ZenDiscovery.selectNextStateToProcess(queue), nullValue()); - } - - public void testSelectNextStateToProcess() { - ClusterName clusterName = new ClusterName("abc"); - DiscoveryNodes nodes = DiscoveryNodes.builder().masterNodeId("a").build(); - - int numUpdates = scaledRandomIntBetween(50, 100); - LinkedList queue = new LinkedList<>(); - for (int i = 0; i < numUpdates; i++) { - queue.add(new ProcessClusterState(ClusterState.builder(clusterName).version(i).nodes(nodes).build())); - } - ProcessClusterState mostRecent = queue.get(numUpdates - 1); - Collections.shuffle(queue, getRandom()); - - assertThat(ZenDiscovery.selectNextStateToProcess(queue), sameInstance(mostRecent.clusterState)); - assertThat(mostRecent.processed, is(true)); - assertThat(queue.size(), equalTo(0)); - } - - public void testSelectNextStateToProcess_differentMasters() { - ClusterName clusterName = new ClusterName("abc"); - DiscoveryNodes nodes1 = DiscoveryNodes.builder().masterNodeId("a").build(); - DiscoveryNodes nodes2 = DiscoveryNodes.builder().masterNodeId("b").build(); - - LinkedList queue = new LinkedList<>(); - ProcessClusterState thirdMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(1).nodes(nodes1).build()); - queue.offer(thirdMostRecent); - ProcessClusterState secondMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(2).nodes(nodes1).build()); - queue.offer(secondMostRecent); - ProcessClusterState mostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(3).nodes(nodes1).build()); - queue.offer(mostRecent); - Collections.shuffle(queue, getRandom()); - queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(4).nodes(nodes2).build())); - queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(5).nodes(nodes1).build())); - - - assertThat(ZenDiscovery.selectNextStateToProcess(queue), sameInstance(mostRecent.clusterState)); - assertThat(thirdMostRecent.processed, is(true)); - assertThat(secondMostRecent.processed, is(true)); - assertThat(mostRecent.processed, is(true)); - assertThat(queue.size(), equalTo(2)); - assertThat(queue.get(0).processed, is(false)); - assertThat(queue.get(1).processed, is(false)); - } - } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java new file mode 100644 index 00000000000..a8e9f00eb7f --- /dev/null +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java @@ -0,0 +1,224 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.discovery.zen.publish; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.discovery.zen.publish.PendingClusterStatesQueue.ClusterStateContext; +import org.elasticsearch.test.ESTestCase; + +import java.util.*; + +import static org.hamcrest.Matchers.*; + +public class PendingClusterStatesQueueTests extends ESTestCase { + + public void testSelectNextStateToProcess_empty() { + PendingClusterStatesQueue queue = new PendingClusterStatesQueue(logger, randomIntBetween(1, 200)); + assertThat(queue.getNextClusterStateToProcess(), nullValue()); + } + + public void testDroppingStatesAtCapacity() { + List states = randomStates(scaledRandomIntBetween(10, 300), "master1", "master2", "master3", "master4"); + Collections.shuffle(states, random()); + // insert half of the states + final int numberOfStateToDrop = states.size() / 2; + List stateToDrop = states.subList(0, numberOfStateToDrop); + final int queueSize = states.size() - numberOfStateToDrop; + PendingClusterStatesQueue queue = createQueueWithStates(stateToDrop, queueSize); + List committedContexts = randomCommitStates(queue); + for (ClusterState state : states.subList(numberOfStateToDrop, states.size())) { + queue.addPending(state); + } + + assertThat(queue.pendingClusterStates().length, equalTo(queueSize)); + // check all committed states got a failure due to the drop + for (ClusterStateContext context : committedContexts) { + assertThat(((MockListener) context.listener).failure, notNullValue()); + } + + // all states that should have dropped are indeed dropped. + for (ClusterState state : stateToDrop) { + assertThat(queue.findState(state.stateUUID()), nullValue()); + } + + } + + public void testSimpleQueueSameMaster() { + final int numUpdates = scaledRandomIntBetween(50, 100); + List states = randomStates(numUpdates, "master"); + Collections.shuffle(states, random()); + PendingClusterStatesQueue queue; + queue = createQueueWithStates(states); + + // no state is committed yet + assertThat(queue.getNextClusterStateToProcess(), nullValue()); + + ClusterState highestCommitted = null; + for (ClusterStateContext context : randomCommitStates(queue)) { + if (highestCommitted == null || context.state.supersedes(highestCommitted)) { + highestCommitted = context.state; + } + } + + assertThat(queue.getNextClusterStateToProcess(), sameInstance(highestCommitted)); + + queue.markAsProcessed(highestCommitted); + + // now there is nothing more to process + assertThat(queue.getNextClusterStateToProcess(), nullValue()); + } + + public void testProcessedStateCleansStatesFromOtherMasters() { + List states = randomStates(scaledRandomIntBetween(10, 300), "master1", "master2", "master3", "master4"); + PendingClusterStatesQueue queue = createQueueWithStates(states); + List committedContexts = randomCommitStates(queue); + ClusterState randomCommitted = randomFrom(committedContexts).state; + queue.markAsProcessed(randomCommitted); + final String processedMaster = randomCommitted.nodes().masterNodeId(); + + // now check that queue doesn't contain anything pending from another master + for (ClusterStateContext context : queue.pendingStates) { + final String pendingMaster = context.state.nodes().masterNodeId(); + assertThat("found a cluster state from [" + pendingMaster + + "], after a state from [" + processedMaster + "] was proccessed", + pendingMaster, equalTo(processedMaster)); + } + // and check all committed contexts from another master were failed + for (ClusterStateContext context : committedContexts) { + if (context.state.nodes().masterNodeId().equals(processedMaster) == false) { + assertThat(((MockListener) context.listener).failure, notNullValue()); + } + } + } + + public void testFailedStateCleansSupersededStatesOnly() { + List states = randomStates(scaledRandomIntBetween(10, 50), "master1", "master2", "master3", "master4"); + PendingClusterStatesQueue queue = createQueueWithStates(states); + List committedContexts = randomCommitStates(queue); + ClusterState toFail = randomFrom(committedContexts).state; + queue.markAsFailed(toFail, new ElasticsearchException("boo!")); + final Map committedContextsById = new HashMap<>(); + for (ClusterStateContext context : committedContexts) { + committedContextsById.put(context.stateUUID(), context); + } + + // now check that queue doesn't contain superseded states + for (ClusterStateContext context : queue.pendingStates) { + if (context.committed()) { + assertFalse("found a committed cluster state, which is superseded by a failed state.\nFound:" + context.state + "\nfailed:" + toFail, + toFail.supersedes(context.state)); + } + } + // check no state has been erroneously removed + for (ClusterState state : states) { + ClusterStateContext pendingContext = queue.findState(state.stateUUID()); + if (pendingContext != null) { + continue; + } + if (state.equals(toFail)) { + continue; + } + assertThat("non-committed states should never be removed", committedContextsById, hasKey(state.stateUUID())); + final ClusterStateContext context = committedContextsById.get(state.stateUUID()); + assertThat("removed state is not superseded by failed state. \nRemoved state:" + context + "\nfailed: " + toFail, + toFail.supersedes(context.state), equalTo(true)); + assertThat("removed state was failed with wrong exception", ((MockListener) context.listener).failure, notNullValue()); + assertThat("removed state was failed with wrong exception", ((MockListener) context.listener).failure.getMessage(), containsString("boo")); + } + } + + public void testFailAllAndClear() { + List states = randomStates(scaledRandomIntBetween(10, 50), "master1", "master2", "master3", "master4"); + PendingClusterStatesQueue queue = createQueueWithStates(states); + List committedContexts = randomCommitStates(queue); + queue.failAllStatesAndClear(new ElasticsearchException("boo!")); + assertThat(queue.pendingStates, empty()); + assertThat(queue.getNextClusterStateToProcess(), nullValue()); + for (ClusterStateContext context : committedContexts) { + assertThat("state was failed with wrong exception", ((MockListener) context.listener).failure, notNullValue()); + assertThat("state was failed with wrong exception", ((MockListener) context.listener).failure.getMessage(), containsString("boo")); + } + } + + protected List randomCommitStates(PendingClusterStatesQueue queue) { + List committedContexts = new ArrayList<>(); + for (int iter = randomInt(queue.pendingStates.size() - 1); iter >= 0; iter--) { + ClusterState state = queue.markAsCommitted(randomFrom(queue.pendingStates).stateUUID(), new MockListener()); + if (state != null) { + // null cluster state means we committed twice + committedContexts.add(queue.findState(state.stateUUID())); + } + } + return committedContexts; + } + + PendingClusterStatesQueue createQueueWithStates(List states) { + return createQueueWithStates(states, states.size() * 2); // we don't care about limits (there are dedicated tests for that) + } + + PendingClusterStatesQueue createQueueWithStates(List states, int maxQueueSize) { + PendingClusterStatesQueue queue; + queue = new PendingClusterStatesQueue(logger, maxQueueSize); + for (ClusterState state : states) { + queue.addPending(state); + } + return queue; + } + + List randomStates(int count, String... masters) { + ArrayList states = new ArrayList<>(count); + ClusterState[] lastClusterStatePerMaster = new ClusterState[masters.length]; + for (; count > 0; count--) { + int masterIndex = randomInt(masters.length - 1); + ClusterState state = lastClusterStatePerMaster[masterIndex]; + if (state == null) { + state = ClusterState.builder(ClusterName.DEFAULT).nodes(DiscoveryNodes.builder() + .put(new DiscoveryNode(masters[masterIndex], DummyTransportAddress.INSTANCE, Version.CURRENT)).masterNodeId(masters[masterIndex]).build() + ).build(); + } else { + state = ClusterState.builder(state).incrementVersion().build(); + } + states.add(state); + lastClusterStatePerMaster[masterIndex] = state; + } + return states; + } + + static class MockListener implements PendingClusterStatesQueue.StateProcessedListener { + volatile boolean processed; + volatile Throwable failure; + + @Override + public void onNewClusterStateProcessed() { + processed = true; + } + + @Override + public void onNewClusterStateFailed(Throwable t) { + failure = t; + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index 1a078171fdd..b9b41098877 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -52,10 +51,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -71,7 +67,7 @@ public class PublishClusterStateActionTests extends ESTestCase { protected ThreadPool threadPool; protected Map nodes = newHashMap(); - public static class MockNode implements PublishClusterStateAction.NewClusterStateListener, DiscoveryNodesProvider { + public static class MockNode implements PublishClusterStateAction.NewPendingClusterStateListener, DiscoveryNodesProvider { public final DiscoveryNode discoveryNode; public final MockTransportService service; public MockPublishAction action; @@ -89,19 +85,33 @@ public class PublishClusterStateActionTests extends ESTestCase { this.clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(DiscoveryNodes.builder().put(discoveryNode).localNodeId(discoveryNode.id()).build()).build(); } + public MockNode setAsMaster() { + this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).masterNodeId(discoveryNode.id())).build(); + return this; + } + + public MockNode resetMasterId() { + this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).masterNodeId(null)).build(); + return this; + } + + public void connectTo(DiscoveryNode node) { service.connectToNode(node); } @Override - public void onNewClusterState(ClusterState newClusterState, NewStateProcessed newStateProcessed) { + public void onNewClusterState(String reason) { + ClusterState newClusterState = action.pendingStatesQueue().getNextClusterStateToProcess(); logger.debug("[{}] received version [{}], uuid [{}]", discoveryNode.name(), newClusterState.version(), newClusterState.stateUUID()); if (listener != null) { ClusterChangedEvent event = new ClusterChangedEvent("", newClusterState, clusterState); listener.clusterChanged(event); } - clusterState = newClusterState; - newStateProcessed.onNewClusterStateProcessed(); + if (clusterState.nodes().masterNode() == null || newClusterState.supersedes(clusterState)) { + clusterState = newClusterState; + } + action.pendingStatesQueue().markAsProcessed(newClusterState); } @Override @@ -211,22 +221,21 @@ public class PublishClusterStateActionTests extends ESTestCase { } protected MockPublishAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, DiscoveryNodesProvider nodesProvider, - PublishClusterStateAction.NewClusterStateListener listener) { + PublishClusterStateAction.NewPendingClusterStateListener listener) { DiscoverySettings discoverySettings = new DiscoverySettings(settings, new NodeSettingsService(settings)); return new MockPublishAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT); } @Test public void testSimpleClusterStatePublishing() throws Exception { - MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT); + MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT).setAsMaster(); MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT); // Initial cluster state - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build(); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); + ClusterState clusterState = nodeA.clusterState; // cluster state update - add nodeB - discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build(); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(clusterState.nodes()).put(nodeB.discoveryNode).build(); ClusterState previousClusterState = clusterState; clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); @@ -277,6 +286,11 @@ public class PublishClusterStateActionTests extends ESTestCase { assertSameStateFromFull(nodeC.clusterState, clusterState); assertFalse(nodeC.clusterState.wasReadFromDiff()); + // node A steps down from being master + nodeA.resetMasterId(); + nodeB.resetMasterId(); + nodeC.resetMasterId(); + // node B becomes the master and sends a version of the cluster state that goes back discoveryNodes = DiscoveryNodes.builder(discoveryNodes) .put(nodeA.discoveryNode) @@ -300,12 +314,12 @@ public class PublishClusterStateActionTests extends ESTestCase { public void clusterChanged(ClusterChangedEvent event) { fail("Shouldn't send cluster state to myself"); } - }); + }).setAsMaster(); MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT); // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build(); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).put(nodeB.discoveryNode).build(); ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); @@ -337,7 +351,7 @@ public class PublishClusterStateActionTests extends ESTestCase { }); // Initial cluster state - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build(); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).masterNodeId(nodeA.discoveryNode.id()).build(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); // cluster state update - add nodeB @@ -354,16 +368,21 @@ public class PublishClusterStateActionTests extends ESTestCase { /** - * Test not waiting publishing works correctly (i.e., publishing times out) + * Test not waiting on publishing works correctly (i.e., publishing times out) */ @Test public void testSimultaneousClusterStatePublishing() throws Exception { int numberOfNodes = randomIntBetween(2, 10); int numberOfIterations = scaledRandomIntBetween(5, 50); Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE, randomBoolean()).build(); - DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(); - MockNode master = null; - for (int i = 0; i < numberOfNodes; i++) { + MockNode master = createMockNode("node0", settings, Version.CURRENT, new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + assertProperMetaDataForVersion(event.state().metaData(), event.state().version()); + } + }).setAsMaster(); + DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(master.nodes()); + for (int i = 1; i < numberOfNodes; i++) { final String name = "node" + i; final MockNode node = createMockNode(name, settings, Version.CURRENT, new ClusterStateListener() { @Override @@ -371,14 +390,10 @@ public class PublishClusterStateActionTests extends ESTestCase { assertProperMetaDataForVersion(event.state().metaData(), event.state().version()); } }); - if (i == 0) { - master = node; - } discoveryNodesBuilder.put(node.discoveryNode); } AssertingAckListener[] listeners = new AssertingAckListener[numberOfIterations]; - discoveryNodesBuilder.localNodeId(master.discoveryNode.id()); DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); MetaData metaData = MetaData.EMPTY_META_DATA; ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build(); @@ -398,8 +413,7 @@ public class PublishClusterStateActionTests extends ESTestCase { master.clusterState = clusterState; for (MockNode node : nodes.values()) { - assertThat(node.discoveryNode + " misses a cluster state", node.clusterState, notNullValue()); - assertThat(node.discoveryNode + " unexpected cluster state: " + node.clusterState, node.clusterState.version(), equalTo(clusterState.version())); + assertSameState(node.clusterState, clusterState); assertThat(node.clusterState.nodes().localNode(), equalTo(node.discoveryNode)); } } @@ -412,12 +426,12 @@ public class PublishClusterStateActionTests extends ESTestCase { public void clusterChanged(ClusterChangedEvent event) { fail("Shouldn't send cluster state to myself"); } - }); + }).setAsMaster(); MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT); // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build(); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).put(nodeB.discoveryNode).build(); ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build(); ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build(); publishStateAndWait(nodeA.action, clusterState, previousClusterState); @@ -612,69 +626,59 @@ public class PublishClusterStateActionTests extends ESTestCase { } catch (IllegalStateException OK) { } - logger.info("--> testing rejection of an old cluster state"); + logger.info("--> testing acceptance of an old cluster state"); state = node.clusterState; node.clusterState = ClusterState.builder(node.clusterState).incrementVersion().build(); - try { - node.action.validateIncomingState(state, node.clusterState); - fail("node accepted state with an older version"); - } catch (IllegalStateException OK) { - } + node.action.validateIncomingState(state, node.clusterState); - // an older version from a *new* master is OK! + // an older version from a *new* master is also OK! ClusterState previousState = ClusterState.builder(node.clusterState).incrementVersion().build(); state = ClusterState.builder(node.clusterState) .nodes(DiscoveryNodes.builder(node.clusterState.nodes()).masterNodeId("_new_master_").build()) .build(); // remove the master of the node (but still have a previous cluster state with it)! - node.clusterState = ClusterState.builder(node.clusterState) - .nodes(DiscoveryNodes.builder(node.clusterState.nodes()).masterNodeId(null).build()) - .build(); + node.resetMasterId(); node.action.validateIncomingState(state, previousState); } public void testInterleavedPublishCommit() throws Throwable { - MockNode node = createMockNode("node"); - final ClusterState state1 = ClusterState.builder(node.clusterState).incrementVersion().build(); - final ClusterState state2 = ClusterState.builder(state1).incrementVersion().build(); - final BytesReference state1Bytes = PublishClusterStateAction.serializeFullClusterState(state1, Version.CURRENT); - final BytesReference state2Bytes = PublishClusterStateAction.serializeFullClusterState(state2, Version.CURRENT); + MockNode node = createMockNode("node").setAsMaster(); final CapturingTransportChannel channel = new CapturingTransportChannel(); - node.action.handleIncomingClusterStateRequest(new BytesTransportRequest(state1Bytes, Version.CURRENT), channel); - assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); - assertThat(channel.error.get(), nullValue()); - channel.clear(); + List states = new ArrayList<>(); + final int numOfStates = scaledRandomIntBetween(3, 10); + for (int i = 1; i <= numOfStates; i++) { + states.add(ClusterState.builder(node.clusterState).version(i).stateUUID(ClusterState.UNKNOWN_UUID).build()); + } - // another incoming state is OK. Should just override pending state - node.action.handleIncomingClusterStateRequest(new BytesTransportRequest(state2Bytes, Version.CURRENT), channel); - assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); - assertThat(channel.error.get(), nullValue()); - channel.clear(); + final ClusterState finalState = states.get(numOfStates - 1); + Collections.shuffle(states, random()); - // committing previous state should fail - try { - node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state1.stateUUID()), channel); - // sadly, there are ways to percolate errors - assertThat(channel.response.get(), nullValue()); - assertThat(channel.error.get(), notNullValue()); - if (channel.error.get() instanceof IllegalStateException == false) { + logger.info("--> publishing states"); + for (ClusterState state : states) { + node.action.handleIncomingClusterStateRequest( + new BytesTransportRequest(PublishClusterStateAction.serializeFullClusterState(state, Version.CURRENT), Version.CURRENT), + channel); + assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); + assertThat(channel.error.get(), nullValue()); + channel.clear(); + } + + logger.info("--> committing states"); + + Collections.shuffle(states, random()); + for (ClusterState state : states) { + node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel); + assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); + if (channel.error.get() != null) { throw channel.error.get(); } - } catch (IllegalStateException OK) { - } channel.clear(); - // committing second state should succeed - node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state2.stateUUID()), channel); - assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE)); - assertThat(channel.error.get(), nullValue()); - channel.clear(); - - // now check it was really committed - assertSameState(node.clusterState, state2); + //now check the last state held + assertSameState(node.clusterState, finalState); } /** @@ -809,7 +813,7 @@ public class PublishClusterStateActionTests extends ESTestCase { AtomicBoolean timeoutOnCommit = new AtomicBoolean(); AtomicBoolean errorOnCommit = new AtomicBoolean(); - public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { + public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) { super(settings, transportService, nodesProvider, listener, discoverySettings, clusterName); } diff --git a/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java b/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java index 55fae7fc3fd..0398aae4493 100644 --- a/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java @@ -91,6 +91,8 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexService; @@ -126,33 +128,15 @@ import org.junit.BeforeClass; import java.io.IOException; import java.io.InputStream; -import java.lang.annotation.Annotation; -import java.lang.annotation.ElementType; -import java.lang.annotation.Inherited; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; +import java.lang.annotation.*; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -588,6 +572,20 @@ public abstract class ESIntegTestCase extends ESTestCase { } ensureClusterSizeConsistency(); ensureClusterStateConsistency(); + if (isInternalCluster()) { + // check no pending cluster states are leaked + for (Discovery discovery : internalCluster().getInstances(Discovery.class)) { + if (discovery instanceof ZenDiscovery) { + final ZenDiscovery zenDiscovery = (ZenDiscovery) discovery; + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(zenDiscovery.pendingClusterStates(), emptyArray()); + } + }); + } + } + } beforeIndexDeletion(); cluster().wipe(); // wipe after to make sure we fail in the test that didn't ack the delete if (afterClass || currentClusterScope == Scope.TEST) { @@ -1615,7 +1613,6 @@ public abstract class ESIntegTestCase extends ESTestCase { } - private Scope getCurrentClusterScope() { return getCurrentClusterScope(this.getClass()); } @@ -1750,7 +1747,7 @@ public abstract class ESIntegTestCase extends ESTestCase { String nodeMode = InternalTestCluster.configuredNodeMode(); if (noLocal != null && noNetwork != null) { throw new IllegalStateException("Can't suppress both network and local mode"); - } else if (noLocal != null){ + } else if (noLocal != null) { nodeMode = "network"; } else if (noNetwork != null) { nodeMode = "local"; @@ -2042,13 +2039,15 @@ public abstract class ESIntegTestCase extends ESTestCase { */ @Retention(RetentionPolicy.RUNTIME) @Inherited - public @interface SuppressLocalMode {} + public @interface SuppressLocalMode { + } /** * If used the test will never run in network mode */ @Retention(RetentionPolicy.RUNTIME) @Inherited - public @interface SuppressNetworkMode {} + public @interface SuppressNetworkMode { + } }