Discovery: Add a dedicate queue for incoming ClusterStates

The initial implementation of two phase commit based cluster state publishing (#13062) relied on a single in memory "pending" cluster state that is only processed by ZenDiscovery once committed by the master. While this is fine on it's own, it resulted in an issue with acknowledged APIs, such as the open index API, in the extreme case where a node falls behind and receives a commit message after a new cluster state has been published. Specifically:

1) Master receives and acked-API call and publishes cluster state CS1
2) Master waits for a min-master nodes to receives CS1 and commits it.
3) All nodes that have responded to CS1 are sent a commit message, however, node N didn't respond yet
4) Master waits for publish timeout (defaults to 30s) for all nodes to process the commit. Node N fails to do so.
5) Master publishes a cluster state CS2. Node N responds to cluster state CS1's publishing but receives cluster state CS2 before the commit for CS1 arrives.
6) The commit message for cluster CS1 is processed on node N, but fails because CS2 is pending. This caused the acked API in step 1 to return (but CS2 , is not yet processed).

In this case, the action indicated by CS1 is not yet executed on node N and therefore the acked API calls return pre-maturely. Note that once CS2 is processed but the change in CS1 takes effect (cluster state operations are safe to batch and we do so all the time).

An example failure can be found on: http://build-us-00.elastic.co/job/es_feature_two_phase_pub/314/

This commit extracts the already existing pending cluster state queue (processNewClusterStates) from ZenDiscovery into it's own class, which serves as a temporary container for in-flight cluster states. Once committed the cluster states are transferred to ZenDiscovery as they used to before. This allows "lagging" cluster states to still be successfully committed and processed (and likely to be ignored as a newer cluster state has already been processed).

As a side effect, all batching logic is now extracted from ZenDiscovery and is unit tested.
This commit is contained in:
Boaz Leskes 2015-09-01 15:39:00 +02:00
parent 218979da1b
commit 80b59e0d66
12 changed files with 820 additions and 403 deletions

View File

@ -52,10 +52,7 @@ import org.elasticsearch.discovery.local.LocalDiscovery;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.*;
/**
* Represents the current state of the cluster.
@ -296,6 +293,16 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
}
}
/**
* a cluster state supersedes another state iff they are from the same master and the version this state is higher thant the other state.
* <p/>
* In essence that means that all the changes from the other cluster state are also reflected by the current one
*/
public boolean supersedes(ClusterState other) {
return this.nodes().masterNodeId() != null && this.nodes().masterNodeId().equals(other.nodes().masterNodeId()) && this.version() > other.version();
}
public enum Metric {
VERSION("version"),
MASTER_NODE("master_node"),
@ -814,6 +821,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
builder.fromDiff(true);
return builder.build();
}
}
}

View File

@ -357,7 +357,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
if (currentState.supersedes(nodeSpecificClusterState)) {
return currentState;
}

View File

@ -19,18 +19,11 @@
package org.elasticsearch.discovery.zen;
import com.google.common.base.Objects;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -49,7 +42,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
@ -64,20 +56,12 @@ import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -199,7 +183,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
this.nodesFD.addListener(new NodeFaultDetectionListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings, clusterName);
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewPendingClusterStateListener(), discoverySettings, clusterName);
this.pingService.setPingContextProvider(this);
this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());
@ -358,6 +342,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return joinThreadControl.joinThreadActive();
}
// used for testing
public ClusterState[] pendingClusterStates() {
return publishClusterState.pendingStatesQueue().pendingClusterStates();
}
/**
* the main function of a join thread. This function is guaranteed to join the cluster
* or spawn a new join thread upon failure to do so.
@ -428,7 +418,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
}
// Note: we do not have to start master fault detection here because it's set at {@link #handleNewClusterStateFromMaster }
// Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState }
// when the first cluster state arrives.
joinThreadControl.markThreadAsDone(currentThread);
return currentState;
@ -634,9 +624,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
.masterNodeId(null).build();
// flush any pending cluster states from old master, so it will not be set as master again
ArrayList<ProcessClusterState> pendingNewClusterStates = new ArrayList<>();
processNewClusterStates.drainTo(pendingNewClusterStates);
logger.trace("removed [{}] pending cluster states", pendingNewClusterStates.size());
publishClusterState.pendingStatesQueue().failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
if (rejoinOnMasterGone) {
return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "master left (reason = " + reason + ")");
@ -682,171 +670,98 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
});
}
static class ProcessClusterState {
final ClusterState clusterState;
volatile boolean processed;
void processNextPendingClusterState(String reason) {
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + reason + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
ClusterState newClusterState = null;
ProcessClusterState(ClusterState clusterState) {
this.clusterState = clusterState;
}
}
@Override
public ClusterState execute(ClusterState currentState) {
newClusterState = publishClusterState.pendingStatesQueue().getNextClusterStateToProcess();
private final BlockingQueue<ProcessClusterState> processNewClusterStates = ConcurrentCollections.newBlockingQueue();
void handleNewClusterStateFromMaster(ClusterState newClusterState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
final ClusterName incomingClusterName = newClusterState.getClusterName();
if (localNodeMaster()) {
logger.debug("received cluster state from [{}] which is also master with cluster name [{}]", newClusterState.nodes().masterNode(), incomingClusterName);
final ClusterState newState = newClusterState;
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return handleAnotherMaster(currentState, newState.nodes().masterNode(), newState.version(), "via a new cluster state");
// all pending states have been processed
if (newClusterState == null) {
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
newStateProcessed.onNewClusterStateProcessed();
assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";
if (currentState.nodes().localNodeMaster()) {
return handleAnotherMaster(currentState, newClusterState.nodes().masterNode(), newClusterState.version(), "via a new cluster state");
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
newStateProcessed.onNewClusterStateFailed(t);
if (shouldIgnoreOrRejectNewClusterState(logger, currentState, newClusterState)) {
return currentState;
}
});
} else {
// check to see that we monitor the correct master of the cluster
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().masterNode())) {
masterFD.restart(newClusterState.nodes().masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
}
if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
// its a fresh update from the master as we transition from a start of not having a master to having one
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().masterNodeId());
long count = clusterJoinsCounter.incrementAndGet();
logger.trace("updated cluster join cluster to [{}]", count);
return newClusterState;
}
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState);
processNewClusterStates.add(processClusterState);
// some optimizations to make sure we keep old objects where possible
ClusterState.Builder builder = ClusterState.builder(newClusterState);
assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// we already processed it in a previous event
if (processClusterState.processed) {
return currentState;
}
// TODO: once improvement that we can do is change the message structure to include version and masterNodeId
// at the start, this will allow us to keep the "compressed bytes" around, and only parse the first page
// to figure out if we need to use it or not, and only once we picked the latest one, parse the whole state
ClusterState updatedState = selectNextStateToProcess(processNewClusterStates);
if (updatedState == null) {
updatedState = currentState;
}
if (shouldIgnoreOrRejectNewClusterState(logger, currentState, updatedState)) {
return currentState;
}
// we don't need to do this, since we ping the master, and get notified when it has moved from being a master
// because it doesn't have enough master nodes...
//if (!electMaster.hasEnoughMasterNodes(newState.nodes())) {
// return disconnectFromCluster(newState, "not enough master nodes on new cluster state wreceived from [" + newState.nodes().masterNode() + "]");
//}
// check to see that we monitor the correct master of the cluster
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(updatedState.nodes().masterNode())) {
masterFD.restart(updatedState.nodes().masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
}
if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
// its a fresh update from the master as we transition from a start of not having a master to having one
logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId());
long count = clusterJoinsCounter.incrementAndGet();
logger.trace("updated cluster join cluster to [{}]", count);
return updatedState;
}
// some optimizations to make sure we keep old objects where possible
ClusterState.Builder builder = ClusterState.builder(updatedState);
// if the routing table did not change, use the original one
if (updatedState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
}
// same for metadata
if (updatedState.metaData().version() == currentState.metaData().version()) {
builder.metaData(currentState.metaData());
} else {
// if its not the same version, only copy over new indices or ones that changed the version
MetaData.Builder metaDataBuilder = MetaData.builder(updatedState.metaData()).removeAllIndices();
for (IndexMetaData indexMetaData : updatedState.metaData()) {
IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.index());
if (currentIndexMetaData != null && currentIndexMetaData.isSameUUID(indexMetaData.indexUUID()) &&
currentIndexMetaData.version() == indexMetaData.version()) {
// safe to reuse
metaDataBuilder.put(currentIndexMetaData, false);
} else {
metaDataBuilder.put(indexMetaData, false);
}
// if the routing table did not change, use the original one
if (newClusterState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
}
// same for metadata
if (newClusterState.metaData().version() == currentState.metaData().version()) {
builder.metaData(currentState.metaData());
} else {
// if its not the same version, only copy over new indices or ones that changed the version
MetaData.Builder metaDataBuilder = MetaData.builder(newClusterState.metaData()).removeAllIndices();
for (IndexMetaData indexMetaData : newClusterState.metaData()) {
IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.index());
if (currentIndexMetaData != null && currentIndexMetaData.isSameUUID(indexMetaData.indexUUID()) &&
currentIndexMetaData.version() == indexMetaData.version()) {
// safe to reuse
metaDataBuilder.put(currentIndexMetaData, false);
} else {
metaDataBuilder.put(indexMetaData, false);
}
builder.metaData(metaDataBuilder);
}
return builder.build();
builder.metaData(metaDataBuilder);
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
newStateProcessed.onNewClusterStateFailed(t);
}
return builder.build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
if (newClusterState != null) {
try {
publishClusterState.pendingStatesQueue().markAsFailed(newClusterState, t);
} catch (Throwable unexpected) {
logger.error("unexpected exception while failing [{}]", unexpected, source);
}
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
try {
sendInitialStateEventIfNeeded();
newStateProcessed.onNewClusterStateProcessed();
if (newClusterState != null) {
publishClusterState.pendingStatesQueue().markAsProcessed(newClusterState);
}
} catch (Throwable t) {
onFailure(source, t);
}
});
}
}
/**
* Picks the cluster state with highest version with the same master from the queue. All cluster states with
* lower versions are ignored. If a cluster state with a different master is seen the processing logic stops and the
* last processed state is returned.
*/
static ClusterState selectNextStateToProcess(Queue<ProcessClusterState> processNewClusterStates) {
// try and get the state with the highest version out of all the ones with the same master node id
ProcessClusterState stateToProcess = processNewClusterStates.poll();
if (stateToProcess == null) {
return null;
}
stateToProcess.processed = true;
while (true) {
ProcessClusterState potentialState = processNewClusterStates.peek();
// nothing else in the queue, bail
if (potentialState == null) {
break;
}
// if its not from the same master, then bail
if (!Objects.equal(stateToProcess.clusterState.nodes().masterNodeId(), potentialState.clusterState.nodes().masterNodeId())) {
break;
}
// we are going to use it for sure, poll (remove) it
potentialState = processNewClusterStates.poll();
if (potentialState == null) {
// might happen if the queue is drained
break;
}
potentialState.processed = true;
if (potentialState.clusterState.version() > stateToProcess.clusterState.version()) {
// we found a new one
stateToProcess = potentialState;
}
}
return stateToProcess.clusterState;
});
}
/**
@ -857,7 +772,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
*/
static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState);
if (currentState.nodes().masterNodeId() != null && newClusterState.version() < currentState.version()) {
if (currentState.supersedes(newClusterState)) {
// if the new state has a smaller version, and it has the same master node, then no need to process it
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
return true;
@ -1073,11 +988,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
private class NewClusterStateListener implements PublishClusterStateAction.NewClusterStateListener {
private class NewPendingClusterStateListener implements PublishClusterStateAction.NewPendingClusterStateListener {
@Override
public void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed) {
handleNewClusterStateFromMaster(clusterState, newStateProcessed);
public void onNewClusterState(String reason) {
processNextPendingClusterState(reason);
}
}
@ -1111,11 +1026,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return;
}
// nodes pre 1.4.0 do not send this information
if (pingRequest.masterNode() == null) {
return;
}
if (pingsWhileMaster.incrementAndGet() < maxPingsFromAnotherMaster) {
logger.trace("got a ping from another master {}. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
return;

View File

@ -0,0 +1,286 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.publish;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import java.util.ArrayList;
import java.util.Locale;
import java.util.Objects;
/**
* A queue that holds all "in-flight" incoming cluster states from the master. Once a master commits a cluster
* state, it is made available via {@link #getNextClusterStateToProcess()}. The class also takes care of batching
* cluster states for processing and failures.
* <p/>
* The queue is bound by {@link #maxQueueSize}. When the queue is at capacity and a new cluster state is inserted
* the oldest cluster state will be dropped. This is safe because:
* 1) Under normal operations, master will publish & commit a cluster state before processing another change (i.e., the queue length is 1)
* 2) If the master fails to commit a change, it will step down, causing a master election, which will flush the queue.
* 3) In general it's safe to process the incoming cluster state as a replacement to the cluster state that's dropped.
* a) If the dropped cluster is from the same master as the incoming one is, it is likely to be superseded by the incoming state (or another state in the queue).
* This is only not true in very extreme cases of out of order delivery.
* b) If the dropping cluster state is not from the same master, it means that:
* i) we are no longer following the master of the dropped cluster state but follow the incoming one
* ii) we are no longer following any master, in which case it doesn't matter which cluster state will be processed first.
* <p/>
* The class is fully thread safe and can be used concurrently.
*/
public class PendingClusterStatesQueue {
interface StateProcessedListener {
void onNewClusterStateProcessed();
void onNewClusterStateFailed(Throwable t);
}
final ArrayList<ClusterStateContext> pendingStates = new ArrayList<>();
final ESLogger logger;
final int maxQueueSize;
public PendingClusterStatesQueue(ESLogger logger, int maxQueueSize) {
this.logger = logger;
this.maxQueueSize = maxQueueSize;
}
/** Add an incoming, not yet committed cluster state */
public synchronized void addPending(ClusterState state) {
pendingStates.add(new ClusterStateContext(state));
if (pendingStates.size() > maxQueueSize) {
ClusterStateContext context = pendingStates.remove(0);
logger.warn("dropping pending state [{}]. more than [{}] pending states.", context, maxQueueSize);
if (context.committed()) {
context.listener.onNewClusterStateFailed(new ElasticsearchException("too many pending states ([{}] pending)", maxQueueSize));
}
}
}
/**
* Mark a previously added cluster state as committed. This will make it available via {@link #getNextClusterStateToProcess()}
* When the cluster state is processed (or failed), the supplied listener will be called
**/
public synchronized ClusterState markAsCommitted(String stateUUID, StateProcessedListener listener) {
final ClusterStateContext context = findState(stateUUID);
if (context == null) {
listener.onNewClusterStateFailed(new IllegalStateException("can't resolve cluster state with uuid [" + stateUUID + "] to commit"));
return null;
}
if (context.committed()) {
listener.onNewClusterStateFailed(new IllegalStateException("cluster state with uuid [" + stateUUID + "] is already committed"));
return null;
}
context.markAsCommitted(listener);
return context.state;
}
/**
* mark that the processing of the given state has failed. All committed states that are {@link ClusterState#supersedes(ClusterState)}-ed
* by this failed state, will be failed as well
*/
public synchronized void markAsFailed(ClusterState state, Throwable reason) {
final ClusterStateContext failedContext = findState(state.stateUUID());
if (failedContext == null) {
throw new IllegalArgumentException("can't resolve failed cluster state with uuid [" + state.stateUUID() + "], version [" + state.version() + "]");
}
if (failedContext.committed() == false) {
throw new IllegalArgumentException("failed cluster state is not committed " + state);
}
// fail all committed states which are batch together with the failed state
ArrayList<ClusterStateContext> statesToRemove = new ArrayList<>();
for (int index = 0; index < pendingStates.size(); index++) {
final ClusterStateContext pendingContext = pendingStates.get(index);
if (pendingContext.committed() == false) {
continue;
}
final ClusterState pendingState = pendingContext.state;
if (pendingContext.equals(failedContext)) {
statesToRemove.add(pendingContext);
pendingContext.listener.onNewClusterStateFailed(reason);
} else if (state.supersedes(pendingState)) {
statesToRemove.add(pendingContext);
logger.debug("failing committed state {} together with state {}", pendingContext, failedContext);
pendingContext.listener.onNewClusterStateFailed(reason);
}
}
pendingStates.removeAll(statesToRemove);
assert findState(state.stateUUID()) == null : "state was marked as processed but can still be found in pending list " + state;
}
/**
* indicates that a cluster state was successfully processed. Any committed state that is {@link ClusterState#supersedes(ClusterState)}-ed
* by the processed state will be marked as processed as well.
* <p/>
* NOTE: successfully processing a state indicates we are following the master it came from. Any committed state from another master will
* be failed by this method
*/
public synchronized void markAsProcessed(ClusterState state) {
if (findState(state.stateUUID()) == null) {
throw new IllegalStateException("can't resolve processed cluster state with uuid [" + state.stateUUID() + "], version [" + state.version() + "]");
}
final DiscoveryNode currentMaster = state.nodes().masterNode();
assert currentMaster != null : "processed cluster state mast have a master. " + state;
// fail or remove any incoming state from a different master
// respond to any committed state from the same master with same or lower version (we processed a higher version)
ArrayList<ClusterStateContext> contextsToRemove = new ArrayList<>();
for (int index = 0; index < pendingStates.size(); index++) {
final ClusterStateContext pendingContext = pendingStates.get(index);
final ClusterState pendingState = pendingContext.state;
final DiscoveryNode pendingMasterNode = pendingState.nodes().masterNode();
if (Objects.equals(currentMaster, pendingMasterNode) == false) {
contextsToRemove.add(pendingContext);
if (pendingContext.committed()) {
// this is a committed state , warn
logger.warn("received a cluster state (uuid[{}]/v[{}]) from a different master than the current one, rejecting (received {}, current {})",
pendingState.stateUUID(), pendingState.version(),
pendingMasterNode, currentMaster);
pendingContext.listener.onNewClusterStateFailed(
new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + pendingMasterNode + ", current " + currentMaster + ")")
);
} else {
logger.trace("removing non-committed state with uuid[{}]/v[{}] from [{}] - a state from [{}] was successfully processed",
pendingState.stateUUID(), pendingState.version(), pendingMasterNode,
currentMaster
);
}
} else if (state.supersedes(pendingState) && pendingContext.committed()) {
logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]",
pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version()
);
contextsToRemove.add(pendingContext);
pendingContext.listener.onNewClusterStateProcessed();
} else if (pendingState.stateUUID().equals(state.stateUUID())) {
assert pendingContext.committed() : "processed cluster state is not committed " + state;
contextsToRemove.add(pendingContext);
pendingContext.listener.onNewClusterStateProcessed();
}
}
// now ack the processed state
pendingStates.removeAll(contextsToRemove);
assert findState(state.stateUUID()) == null : "state was marked as processed but can still be found in pending list " + state;
}
ClusterStateContext findState(String stateUUID) {
for (int i = 0; i < pendingStates.size(); i++) {
final ClusterStateContext context = pendingStates.get(i);
if (context.stateUUID().equals(stateUUID)) {
return context;
}
}
return null;
}
/** clear the incoming queue. any committed state will be failed */
public synchronized void failAllStatesAndClear(Throwable reason) {
for (ClusterStateContext pendingState : pendingStates) {
if (pendingState.committed()) {
pendingState.listener.onNewClusterStateFailed(reason);
}
}
pendingStates.clear();
}
/**
* Gets the next committed state to process.
* <p/>
* The method tries to batch operation by getting the cluster state the highest possible committed states
* which succeeds the first committed state in queue (i.e., it comes from the same master).
*/
public synchronized ClusterState getNextClusterStateToProcess() {
if (pendingStates.isEmpty()) {
return null;
}
ClusterStateContext stateToProcess = null;
int index = 0;
for (; index < pendingStates.size(); index++) {
ClusterStateContext potentialState = pendingStates.get(index);
if (potentialState.committed()) {
stateToProcess = potentialState;
break;
}
}
if (stateToProcess == null) {
return null;
}
// now try to find the highest committed state from the same master
for (; index < pendingStates.size(); index++) {
ClusterStateContext potentialState = pendingStates.get(index);
if (potentialState.state.supersedes(stateToProcess.state) && potentialState.committed()) {
// we found a new one
stateToProcess = potentialState;
}
}
assert stateToProcess.committed() : "should only return committed cluster state. found " + stateToProcess.state;
return stateToProcess.state;
}
/** returns all pending states, committed or not */
public synchronized ClusterState[] pendingClusterStates() {
ArrayList<ClusterState> states = new ArrayList<>();
for (ClusterStateContext context : pendingStates) {
states.add(context.state);
}
return states.toArray(new ClusterState[states.size()]);
}
static class ClusterStateContext {
final ClusterState state;
StateProcessedListener listener;
ClusterStateContext(ClusterState clusterState) {
this.state = clusterState;
}
void markAsCommitted(StateProcessedListener listener) {
if (this.listener != null) {
throw new IllegalStateException(toString() + "is already committed");
}
this.listener = listener;
}
boolean committed() {
return listener != null;
}
public String stateUUID() {
return state.stateUUID();
}
@Override
public String toString() {
return String.format(
Locale.ROOT,
"[uuid[%s], v[%d], m[%s]]",
stateUUID(),
state.version(),
state.nodes().masterNodeId()
);
}
}
}

View File

@ -57,32 +57,30 @@ public class PublishClusterStateAction extends AbstractComponent {
public static final String SEND_ACTION_NAME = "internal:discovery/zen/publish/send";
public static final String COMMIT_ACTION_NAME = "internal:discovery/zen/publish/commit";
public interface NewClusterStateListener {
public static final String SETTINGS_MAX_PENDING_CLUSTER_STATES = "discovery.zen.publish.max_pending_cluster_states";
interface NewStateProcessed {
public interface NewPendingClusterStateListener {
void onNewClusterStateProcessed();
void onNewClusterStateFailed(Throwable t);
}
void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed);
/** a new cluster state has been committed and is ready to process via {@link #pendingStatesQueue()} */
void onNewClusterState(String reason);
}
private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final NewClusterStateListener listener;
private final NewPendingClusterStateListener newPendingClusterStatelistener;
private final DiscoverySettings discoverySettings;
private final ClusterName clusterName;
private final PendingClusterStatesQueue pendingStatesQueue;
public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;
this.newPendingClusterStatelistener = listener;
this.discoverySettings = discoverySettings;
this.clusterName = clusterName;
this.pendingStatesQueue = new PendingClusterStatesQueue(logger, settings.getAsInt(SETTINGS_MAX_PENDING_CLUSTER_STATES, 25));
transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest.class, ThreadPool.Names.SAME, new SendClusterStateRequestHandler());
transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest.class, ThreadPool.Names.SAME, new CommitClusterStateRequestHandler());
}
@ -92,6 +90,10 @@ public class PublishClusterStateAction extends AbstractComponent {
transportService.removeHandler(COMMIT_ACTION_NAME);
}
public PendingClusterStatesQueue pendingStatesQueue() {
return pendingStatesQueue;
}
/**
* publishes a cluster change event to other nodes. if at least minMasterNodes acknowledge the change it is committed and will
* be processed by the master and the other nodes.
@ -359,6 +361,7 @@ public class PublishClusterStateAction extends AbstractComponent {
// sanity check incoming state
validateIncomingState(incomingState, lastSeenClusterState);
pendingStatesQueue.addPending(incomingState);
lastSeenClusterState = incomingState;
lastSeenClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
}
@ -382,56 +385,34 @@ public class PublishClusterStateAction extends AbstractComponent {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().masterNode());
throw new IllegalStateException("received state from a node that is not part of the cluster");
}
// state from another master requires more subtle checks, so we let it pass for now (it will be checked in ZenDiscovery)
if (currentNodes.localNodeMaster() == false) {
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
}
if (lastSeenClusterState != null
&& Objects.equals(lastSeenClusterState.nodes().masterNodeId(), incomingState.nodes().masterNodeId())
&& lastSeenClusterState.version() > incomingState.version()) {
logger.debug("received an older cluster state from master, rejecting (received version [{}], last version is [{}])",
incomingState.version(), lastSeenClusterState.version());
throw new IllegalStateException("cluster state version [" + incomingState.version() + "] is old (last seen version [" + lastSeenClusterState.version() + "])");
}
ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
}
protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
ClusterState committedClusterState;
synchronized (lastSeenClusterStateMutex) {
committedClusterState = lastSeenClusterState;
}
// if this message somehow comes without a previous send, we won't have a cluster state
String lastSeenUUID = committedClusterState == null ? null : committedClusterState.stateUUID();
if (request.stateUUID.equals(lastSeenUUID) == false) {
throw new IllegalStateException("tried to commit cluster state UUID [" + request.stateUUID + "], but last seen UUID is [" + lastSeenUUID + "]");
}
try {
listener.onNewClusterState(committedClusterState, new NewClusterStateListener.NewStateProcessed() {
@Override
public void onNewClusterStateProcessed() {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
onNewClusterStateFailed(e);
}
final ClusterState state = pendingStatesQueue.markAsCommitted(request.stateUUID, new PendingClusterStatesQueue.StateProcessedListener() {
@Override
public void onNewClusterStateProcessed() {
try {
// send a response to the master to indicate that this cluster state has been processed post committing it.
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
onNewClusterStateFailed(e);
}
}
@Override
public void onNewClusterStateFailed(Throwable t) {
try {
channel.sendResponse(t);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
}
@Override
public void onNewClusterStateFailed(Throwable t) {
try {
channel.sendResponse(t);
} catch (Throwable e) {
logger.debug("failed to send response on cluster state processed", e);
}
});
} catch (Exception e) {
logger.warn("unexpected error while processing cluster state version [{}]", e, lastSeenClusterState.version());
throw e;
}
});
if (state != null) {
newPendingClusterStatelistener.onNewClusterState("master " + state.nodes().masterNode() + " committed version [" + state.version() + "]");
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
public class ClusterStateTests extends ESTestCase {
public void testSupersedes() {
final DiscoveryNode node1 = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT);
final DiscoveryNode node2 = new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT);
final DiscoveryNodes nodes = DiscoveryNodes.builder().put(node1).put(node2).build();
ClusterState noMaster1 = ClusterState.builder(ClusterName.DEFAULT).version(randomInt(5)).nodes(nodes).build();
ClusterState noMaster2 = ClusterState.builder(ClusterName.DEFAULT).version(randomInt(5)).nodes(nodes).build();
ClusterState withMaster1a = ClusterState.builder(ClusterName.DEFAULT).version(randomInt(5)).nodes(DiscoveryNodes.builder(nodes).masterNodeId(node1.id())).build();
ClusterState withMaster1b = ClusterState.builder(ClusterName.DEFAULT).version(randomInt(5)).nodes(DiscoveryNodes.builder(nodes).masterNodeId(node1.id())).build();
ClusterState withMaster2 = ClusterState.builder(ClusterName.DEFAULT).version(randomInt(5)).nodes(DiscoveryNodes.builder(nodes).masterNodeId(node2.id())).build();
// states with no master should never supersede anything
assertFalse(noMaster1.supersedes(noMaster2));
assertFalse(noMaster1.supersedes(withMaster1a));
// states should never supersede states from another master
assertFalse(withMaster1a.supersedes(withMaster2));
assertFalse(withMaster1a.supersedes(noMaster1));
// state from the same master compare by version
assertThat(withMaster1a.supersedes(withMaster1b), equalTo(withMaster1a.version() > withMaster1b.version()));
}
}

View File

@ -19,6 +19,9 @@
package org.elasticsearch.cluster.ack;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
@ -34,8 +37,8 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.Settings;
@ -44,9 +47,6 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import java.util.concurrent.TimeUnit;
@ -72,7 +72,7 @@ public class AckIT extends ESIntegTestCase {
createIndex("test");
assertAcked(client().admin().indices().prepareUpdateSettings("test")
.setSettings(Settings.builder().put("refresh_interval", 9999, TimeUnit.MILLISECONDS)));
.setSettings(Settings.builder().put("refresh_interval", 9999, TimeUnit.MILLISECONDS)));
for (Client client : clients()) {
String refreshInterval = getLocalClusterState(client).metaData().index("test").settings().get("index.refresh_interval");
@ -178,9 +178,9 @@ public class AckIT extends ESIntegTestCase {
@Test
public void testClusterRerouteAcknowledgement() throws InterruptedException {
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, between(cluster().numDataNodes(), DEFAULT_MAX_NUM_SHARDS))
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, between(cluster().numDataNodes(), DEFAULT_MAX_NUM_SHARDS))
.put(SETTING_NUMBER_OF_REPLICAS, 0)
));
ensureGreen();

View File

@ -22,8 +22,6 @@ package org.elasticsearch.discovery;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
@ -54,7 +52,11 @@ import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
import org.elasticsearch.test.disruption.*;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -734,7 +736,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
*/
@Test
public void unicastSinglePingResponseContainsMaster() throws Exception {
List<String> nodes = startCluster(4, -1, new int[] {0});
List<String> nodes = startCluster(4, -1, new int[]{0});
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node=" + masterNode);
@ -853,6 +855,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
nonMasterTransportService.clearRule(discoveryNodes.masterNode());
ensureStableCluster(2);
// shutting down the nodes, to avoid the leakage check tripping
// on the states associated with the commit requests we may have dropped
internalCluster().stopRandomNonMasterNode();
}
@ -943,7 +949,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
@Test
public void testIndicesDeleted() throws Exception {
configureUnicastCluster(3, null, 2);
Future<List<String>> masterNodes= internalCluster().startMasterOnlyNodesAsync(2);
Future<List<String>> masterNodes = internalCluster().startMasterOnlyNodesAsync(2);
Future<String> dataNode = internalCluster().startDataOnlyNodeAsync();
dataNode.get();
masterNodes.get();

View File

@ -27,14 +27,8 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Queue;
import static org.elasticsearch.discovery.zen.ZenDiscovery.ProcessClusterState;
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.core.IsNull.nullValue;
import static org.hamcrest.Matchers.containsString;
/**
*/
@ -95,53 +89,4 @@ public class ZenDiscoveryUnitTest extends ESTestCase {
}
assertFalse("should not ignore, because current state doesn't have a master", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
}
public void testSelectNextStateToProcess_empty() {
Queue<ProcessClusterState> queue = new LinkedList<>();
assertThat(ZenDiscovery.selectNextStateToProcess(queue), nullValue());
}
public void testSelectNextStateToProcess() {
ClusterName clusterName = new ClusterName("abc");
DiscoveryNodes nodes = DiscoveryNodes.builder().masterNodeId("a").build();
int numUpdates = scaledRandomIntBetween(50, 100);
LinkedList<ProcessClusterState> queue = new LinkedList<>();
for (int i = 0; i < numUpdates; i++) {
queue.add(new ProcessClusterState(ClusterState.builder(clusterName).version(i).nodes(nodes).build()));
}
ProcessClusterState mostRecent = queue.get(numUpdates - 1);
Collections.shuffle(queue, getRandom());
assertThat(ZenDiscovery.selectNextStateToProcess(queue), sameInstance(mostRecent.clusterState));
assertThat(mostRecent.processed, is(true));
assertThat(queue.size(), equalTo(0));
}
public void testSelectNextStateToProcess_differentMasters() {
ClusterName clusterName = new ClusterName("abc");
DiscoveryNodes nodes1 = DiscoveryNodes.builder().masterNodeId("a").build();
DiscoveryNodes nodes2 = DiscoveryNodes.builder().masterNodeId("b").build();
LinkedList<ProcessClusterState> queue = new LinkedList<>();
ProcessClusterState thirdMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(1).nodes(nodes1).build());
queue.offer(thirdMostRecent);
ProcessClusterState secondMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(2).nodes(nodes1).build());
queue.offer(secondMostRecent);
ProcessClusterState mostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(3).nodes(nodes1).build());
queue.offer(mostRecent);
Collections.shuffle(queue, getRandom());
queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(4).nodes(nodes2).build()));
queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(5).nodes(nodes1).build()));
assertThat(ZenDiscovery.selectNextStateToProcess(queue), sameInstance(mostRecent.clusterState));
assertThat(thirdMostRecent.processed, is(true));
assertThat(secondMostRecent.processed, is(true));
assertThat(mostRecent.processed, is(true));
assertThat(queue.size(), equalTo(2));
assertThat(queue.get(0).processed, is(false));
assertThat(queue.get(1).processed, is(false));
}
}

View File

@ -0,0 +1,224 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.publish;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.discovery.zen.publish.PendingClusterStatesQueue.ClusterStateContext;
import org.elasticsearch.test.ESTestCase;
import java.util.*;
import static org.hamcrest.Matchers.*;
public class PendingClusterStatesQueueTests extends ESTestCase {
public void testSelectNextStateToProcess_empty() {
PendingClusterStatesQueue queue = new PendingClusterStatesQueue(logger, randomIntBetween(1, 200));
assertThat(queue.getNextClusterStateToProcess(), nullValue());
}
public void testDroppingStatesAtCapacity() {
List<ClusterState> states = randomStates(scaledRandomIntBetween(10, 300), "master1", "master2", "master3", "master4");
Collections.shuffle(states, random());
// insert half of the states
final int numberOfStateToDrop = states.size() / 2;
List<ClusterState> stateToDrop = states.subList(0, numberOfStateToDrop);
final int queueSize = states.size() - numberOfStateToDrop;
PendingClusterStatesQueue queue = createQueueWithStates(stateToDrop, queueSize);
List<ClusterStateContext> committedContexts = randomCommitStates(queue);
for (ClusterState state : states.subList(numberOfStateToDrop, states.size())) {
queue.addPending(state);
}
assertThat(queue.pendingClusterStates().length, equalTo(queueSize));
// check all committed states got a failure due to the drop
for (ClusterStateContext context : committedContexts) {
assertThat(((MockListener) context.listener).failure, notNullValue());
}
// all states that should have dropped are indeed dropped.
for (ClusterState state : stateToDrop) {
assertThat(queue.findState(state.stateUUID()), nullValue());
}
}
public void testSimpleQueueSameMaster() {
final int numUpdates = scaledRandomIntBetween(50, 100);
List<ClusterState> states = randomStates(numUpdates, "master");
Collections.shuffle(states, random());
PendingClusterStatesQueue queue;
queue = createQueueWithStates(states);
// no state is committed yet
assertThat(queue.getNextClusterStateToProcess(), nullValue());
ClusterState highestCommitted = null;
for (ClusterStateContext context : randomCommitStates(queue)) {
if (highestCommitted == null || context.state.supersedes(highestCommitted)) {
highestCommitted = context.state;
}
}
assertThat(queue.getNextClusterStateToProcess(), sameInstance(highestCommitted));
queue.markAsProcessed(highestCommitted);
// now there is nothing more to process
assertThat(queue.getNextClusterStateToProcess(), nullValue());
}
public void testProcessedStateCleansStatesFromOtherMasters() {
List<ClusterState> states = randomStates(scaledRandomIntBetween(10, 300), "master1", "master2", "master3", "master4");
PendingClusterStatesQueue queue = createQueueWithStates(states);
List<ClusterStateContext> committedContexts = randomCommitStates(queue);
ClusterState randomCommitted = randomFrom(committedContexts).state;
queue.markAsProcessed(randomCommitted);
final String processedMaster = randomCommitted.nodes().masterNodeId();
// now check that queue doesn't contain anything pending from another master
for (ClusterStateContext context : queue.pendingStates) {
final String pendingMaster = context.state.nodes().masterNodeId();
assertThat("found a cluster state from [" + pendingMaster
+ "], after a state from [" + processedMaster + "] was proccessed",
pendingMaster, equalTo(processedMaster));
}
// and check all committed contexts from another master were failed
for (ClusterStateContext context : committedContexts) {
if (context.state.nodes().masterNodeId().equals(processedMaster) == false) {
assertThat(((MockListener) context.listener).failure, notNullValue());
}
}
}
public void testFailedStateCleansSupersededStatesOnly() {
List<ClusterState> states = randomStates(scaledRandomIntBetween(10, 50), "master1", "master2", "master3", "master4");
PendingClusterStatesQueue queue = createQueueWithStates(states);
List<ClusterStateContext> committedContexts = randomCommitStates(queue);
ClusterState toFail = randomFrom(committedContexts).state;
queue.markAsFailed(toFail, new ElasticsearchException("boo!"));
final Map<String, ClusterStateContext> committedContextsById = new HashMap<>();
for (ClusterStateContext context : committedContexts) {
committedContextsById.put(context.stateUUID(), context);
}
// now check that queue doesn't contain superseded states
for (ClusterStateContext context : queue.pendingStates) {
if (context.committed()) {
assertFalse("found a committed cluster state, which is superseded by a failed state.\nFound:" + context.state + "\nfailed:" + toFail,
toFail.supersedes(context.state));
}
}
// check no state has been erroneously removed
for (ClusterState state : states) {
ClusterStateContext pendingContext = queue.findState(state.stateUUID());
if (pendingContext != null) {
continue;
}
if (state.equals(toFail)) {
continue;
}
assertThat("non-committed states should never be removed", committedContextsById, hasKey(state.stateUUID()));
final ClusterStateContext context = committedContextsById.get(state.stateUUID());
assertThat("removed state is not superseded by failed state. \nRemoved state:" + context + "\nfailed: " + toFail,
toFail.supersedes(context.state), equalTo(true));
assertThat("removed state was failed with wrong exception", ((MockListener) context.listener).failure, notNullValue());
assertThat("removed state was failed with wrong exception", ((MockListener) context.listener).failure.getMessage(), containsString("boo"));
}
}
public void testFailAllAndClear() {
List<ClusterState> states = randomStates(scaledRandomIntBetween(10, 50), "master1", "master2", "master3", "master4");
PendingClusterStatesQueue queue = createQueueWithStates(states);
List<ClusterStateContext> committedContexts = randomCommitStates(queue);
queue.failAllStatesAndClear(new ElasticsearchException("boo!"));
assertThat(queue.pendingStates, empty());
assertThat(queue.getNextClusterStateToProcess(), nullValue());
for (ClusterStateContext context : committedContexts) {
assertThat("state was failed with wrong exception", ((MockListener) context.listener).failure, notNullValue());
assertThat("state was failed with wrong exception", ((MockListener) context.listener).failure.getMessage(), containsString("boo"));
}
}
protected List<ClusterStateContext> randomCommitStates(PendingClusterStatesQueue queue) {
List<ClusterStateContext> committedContexts = new ArrayList<>();
for (int iter = randomInt(queue.pendingStates.size() - 1); iter >= 0; iter--) {
ClusterState state = queue.markAsCommitted(randomFrom(queue.pendingStates).stateUUID(), new MockListener());
if (state != null) {
// null cluster state means we committed twice
committedContexts.add(queue.findState(state.stateUUID()));
}
}
return committedContexts;
}
PendingClusterStatesQueue createQueueWithStates(List<ClusterState> states) {
return createQueueWithStates(states, states.size() * 2); // we don't care about limits (there are dedicated tests for that)
}
PendingClusterStatesQueue createQueueWithStates(List<ClusterState> states, int maxQueueSize) {
PendingClusterStatesQueue queue;
queue = new PendingClusterStatesQueue(logger, maxQueueSize);
for (ClusterState state : states) {
queue.addPending(state);
}
return queue;
}
List<ClusterState> randomStates(int count, String... masters) {
ArrayList<ClusterState> states = new ArrayList<>(count);
ClusterState[] lastClusterStatePerMaster = new ClusterState[masters.length];
for (; count > 0; count--) {
int masterIndex = randomInt(masters.length - 1);
ClusterState state = lastClusterStatePerMaster[masterIndex];
if (state == null) {
state = ClusterState.builder(ClusterName.DEFAULT).nodes(DiscoveryNodes.builder()
.put(new DiscoveryNode(masters[masterIndex], DummyTransportAddress.INSTANCE, Version.CURRENT)).masterNodeId(masters[masterIndex]).build()
).build();
} else {
state = ClusterState.builder(state).incrementVersion().build();
}
states.add(state);
lastClusterStatePerMaster[masterIndex] = state;
}
return states;
}
static class MockListener implements PendingClusterStatesQueue.StateProcessedListener {
volatile boolean processed;
volatile Throwable failure;
@Override
public void onNewClusterStateProcessed() {
processed = true;
}
@Override
public void onNewClusterStateFailed(Throwable t) {
failure = t;
}
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -52,10 +51,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -71,7 +67,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
protected ThreadPool threadPool;
protected Map<String, MockNode> nodes = newHashMap();
public static class MockNode implements PublishClusterStateAction.NewClusterStateListener, DiscoveryNodesProvider {
public static class MockNode implements PublishClusterStateAction.NewPendingClusterStateListener, DiscoveryNodesProvider {
public final DiscoveryNode discoveryNode;
public final MockTransportService service;
public MockPublishAction action;
@ -89,19 +85,33 @@ public class PublishClusterStateActionTests extends ESTestCase {
this.clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(DiscoveryNodes.builder().put(discoveryNode).localNodeId(discoveryNode.id()).build()).build();
}
public MockNode setAsMaster() {
this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).masterNodeId(discoveryNode.id())).build();
return this;
}
public MockNode resetMasterId() {
this.clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).masterNodeId(null)).build();
return this;
}
public void connectTo(DiscoveryNode node) {
service.connectToNode(node);
}
@Override
public void onNewClusterState(ClusterState newClusterState, NewStateProcessed newStateProcessed) {
public void onNewClusterState(String reason) {
ClusterState newClusterState = action.pendingStatesQueue().getNextClusterStateToProcess();
logger.debug("[{}] received version [{}], uuid [{}]", discoveryNode.name(), newClusterState.version(), newClusterState.stateUUID());
if (listener != null) {
ClusterChangedEvent event = new ClusterChangedEvent("", newClusterState, clusterState);
listener.clusterChanged(event);
}
clusterState = newClusterState;
newStateProcessed.onNewClusterStateProcessed();
if (clusterState.nodes().masterNode() == null || newClusterState.supersedes(clusterState)) {
clusterState = newClusterState;
}
action.pendingStatesQueue().markAsProcessed(newClusterState);
}
@Override
@ -211,22 +221,21 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
protected MockPublishAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, DiscoveryNodesProvider nodesProvider,
PublishClusterStateAction.NewClusterStateListener listener) {
PublishClusterStateAction.NewPendingClusterStateListener listener) {
DiscoverySettings discoverySettings = new DiscoverySettings(settings, new NodeSettingsService(settings));
return new MockPublishAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT);
}
@Test
public void testSimpleClusterStatePublishing() throws Exception {
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT);
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT).setAsMaster();
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT);
// Initial cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
ClusterState clusterState = nodeA.clusterState;
// cluster state update - add nodeB
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build();
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(clusterState.nodes()).put(nodeB.discoveryNode).build();
ClusterState previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
@ -277,6 +286,11 @@ public class PublishClusterStateActionTests extends ESTestCase {
assertSameStateFromFull(nodeC.clusterState, clusterState);
assertFalse(nodeC.clusterState.wasReadFromDiff());
// node A steps down from being master
nodeA.resetMasterId();
nodeB.resetMasterId();
nodeC.resetMasterId();
// node B becomes the master and sends a version of the cluster state that goes back
discoveryNodes = DiscoveryNodes.builder(discoveryNodes)
.put(nodeA.discoveryNode)
@ -300,12 +314,12 @@ public class PublishClusterStateActionTests extends ESTestCase {
public void clusterChanged(ClusterChangedEvent event) {
fail("Shouldn't send cluster state to myself");
}
});
}).setAsMaster();
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT);
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).put(nodeB.discoveryNode).build();
ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
@ -337,7 +351,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
});
// Initial cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).masterNodeId(nodeA.discoveryNode.id()).build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
// cluster state update - add nodeB
@ -354,16 +368,21 @@ public class PublishClusterStateActionTests extends ESTestCase {
/**
* Test not waiting publishing works correctly (i.e., publishing times out)
* Test not waiting on publishing works correctly (i.e., publishing times out)
*/
@Test
public void testSimultaneousClusterStatePublishing() throws Exception {
int numberOfNodes = randomIntBetween(2, 10);
int numberOfIterations = scaledRandomIntBetween(5, 50);
Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE, randomBoolean()).build();
DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder();
MockNode master = null;
for (int i = 0; i < numberOfNodes; i++) {
MockNode master = createMockNode("node0", settings, Version.CURRENT, new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
assertProperMetaDataForVersion(event.state().metaData(), event.state().version());
}
}).setAsMaster();
DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(master.nodes());
for (int i = 1; i < numberOfNodes; i++) {
final String name = "node" + i;
final MockNode node = createMockNode(name, settings, Version.CURRENT, new ClusterStateListener() {
@Override
@ -371,14 +390,10 @@ public class PublishClusterStateActionTests extends ESTestCase {
assertProperMetaDataForVersion(event.state().metaData(), event.state().version());
}
});
if (i == 0) {
master = node;
}
discoveryNodesBuilder.put(node.discoveryNode);
}
AssertingAckListener[] listeners = new AssertingAckListener[numberOfIterations];
discoveryNodesBuilder.localNodeId(master.discoveryNode.id());
DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build();
MetaData metaData = MetaData.EMPTY_META_DATA;
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
@ -398,8 +413,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
master.clusterState = clusterState;
for (MockNode node : nodes.values()) {
assertThat(node.discoveryNode + " misses a cluster state", node.clusterState, notNullValue());
assertThat(node.discoveryNode + " unexpected cluster state: " + node.clusterState, node.clusterState.version(), equalTo(clusterState.version()));
assertSameState(node.clusterState, clusterState);
assertThat(node.clusterState.nodes().localNode(), equalTo(node.discoveryNode));
}
}
@ -412,12 +426,12 @@ public class PublishClusterStateActionTests extends ESTestCase {
public void clusterChanged(ClusterChangedEvent event) {
fail("Shouldn't send cluster state to myself");
}
});
}).setAsMaster();
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT);
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).put(nodeB.discoveryNode).build();
ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
publishStateAndWait(nodeA.action, clusterState, previousClusterState);
@ -612,69 +626,59 @@ public class PublishClusterStateActionTests extends ESTestCase {
} catch (IllegalStateException OK) {
}
logger.info("--> testing rejection of an old cluster state");
logger.info("--> testing acceptance of an old cluster state");
state = node.clusterState;
node.clusterState = ClusterState.builder(node.clusterState).incrementVersion().build();
try {
node.action.validateIncomingState(state, node.clusterState);
fail("node accepted state with an older version");
} catch (IllegalStateException OK) {
}
node.action.validateIncomingState(state, node.clusterState);
// an older version from a *new* master is OK!
// an older version from a *new* master is also OK!
ClusterState previousState = ClusterState.builder(node.clusterState).incrementVersion().build();
state = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.clusterState.nodes()).masterNodeId("_new_master_").build())
.build();
// remove the master of the node (but still have a previous cluster state with it)!
node.clusterState = ClusterState.builder(node.clusterState)
.nodes(DiscoveryNodes.builder(node.clusterState.nodes()).masterNodeId(null).build())
.build();
node.resetMasterId();
node.action.validateIncomingState(state, previousState);
}
public void testInterleavedPublishCommit() throws Throwable {
MockNode node = createMockNode("node");
final ClusterState state1 = ClusterState.builder(node.clusterState).incrementVersion().build();
final ClusterState state2 = ClusterState.builder(state1).incrementVersion().build();
final BytesReference state1Bytes = PublishClusterStateAction.serializeFullClusterState(state1, Version.CURRENT);
final BytesReference state2Bytes = PublishClusterStateAction.serializeFullClusterState(state2, Version.CURRENT);
MockNode node = createMockNode("node").setAsMaster();
final CapturingTransportChannel channel = new CapturingTransportChannel();
node.action.handleIncomingClusterStateRequest(new BytesTransportRequest(state1Bytes, Version.CURRENT), channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
assertThat(channel.error.get(), nullValue());
channel.clear();
List<ClusterState> states = new ArrayList<>();
final int numOfStates = scaledRandomIntBetween(3, 10);
for (int i = 1; i <= numOfStates; i++) {
states.add(ClusterState.builder(node.clusterState).version(i).stateUUID(ClusterState.UNKNOWN_UUID).build());
}
// another incoming state is OK. Should just override pending state
node.action.handleIncomingClusterStateRequest(new BytesTransportRequest(state2Bytes, Version.CURRENT), channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
assertThat(channel.error.get(), nullValue());
channel.clear();
final ClusterState finalState = states.get(numOfStates - 1);
Collections.shuffle(states, random());
// committing previous state should fail
try {
node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state1.stateUUID()), channel);
// sadly, there are ways to percolate errors
assertThat(channel.response.get(), nullValue());
assertThat(channel.error.get(), notNullValue());
if (channel.error.get() instanceof IllegalStateException == false) {
logger.info("--> publishing states");
for (ClusterState state : states) {
node.action.handleIncomingClusterStateRequest(
new BytesTransportRequest(PublishClusterStateAction.serializeFullClusterState(state, Version.CURRENT), Version.CURRENT),
channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
assertThat(channel.error.get(), nullValue());
channel.clear();
}
logger.info("--> committing states");
Collections.shuffle(states, random());
for (ClusterState state : states) {
node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
if (channel.error.get() != null) {
throw channel.error.get();
}
} catch (IllegalStateException OK) {
}
channel.clear();
// committing second state should succeed
node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state2.stateUUID()), channel);
assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
assertThat(channel.error.get(), nullValue());
channel.clear();
// now check it was really committed
assertSameState(node.clusterState, state2);
//now check the last state held
assertSameState(node.clusterState, finalState);
}
/**
@ -809,7 +813,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
AtomicBoolean timeoutOnCommit = new AtomicBoolean();
AtomicBoolean errorOnCommit = new AtomicBoolean();
public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
super(settings, transportService, nodesProvider, listener, discoverySettings, clusterName);
}

View File

@ -91,6 +91,8 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexService;
@ -126,33 +128,15 @@ import org.junit.BeforeClass;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.annotation.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -588,6 +572,20 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
ensureClusterSizeConsistency();
ensureClusterStateConsistency();
if (isInternalCluster()) {
// check no pending cluster states are leaked
for (Discovery discovery : internalCluster().getInstances(Discovery.class)) {
if (discovery instanceof ZenDiscovery) {
final ZenDiscovery zenDiscovery = (ZenDiscovery) discovery;
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(zenDiscovery.pendingClusterStates(), emptyArray());
}
});
}
}
}
beforeIndexDeletion();
cluster().wipe(); // wipe after to make sure we fail in the test that didn't ack the delete
if (afterClass || currentClusterScope == Scope.TEST) {
@ -1615,7 +1613,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
private Scope getCurrentClusterScope() {
return getCurrentClusterScope(this.getClass());
}
@ -1750,7 +1747,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
String nodeMode = InternalTestCluster.configuredNodeMode();
if (noLocal != null && noNetwork != null) {
throw new IllegalStateException("Can't suppress both network and local mode");
} else if (noLocal != null){
} else if (noLocal != null) {
nodeMode = "network";
} else if (noNetwork != null) {
nodeMode = "local";
@ -2042,13 +2039,15 @@ public abstract class ESIntegTestCase extends ESTestCase {
*/
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SuppressLocalMode {}
public @interface SuppressLocalMode {
}
/**
* If used the test will never run in network mode
*/
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SuppressNetworkMode {}
public @interface SuppressNetworkMode {
}
}