[Discovery] Prevent stale master nodes from sharing dated cluster states to nodes that have moved to a different master node.
If an elected master node goes into a long gc then other nodes' fault detection will notice this and a new master election is started and eventually a new master node is elected. If the previous master nodes goes out of the long gc it can still have pending tasks which can result in new cluster state updates. Nodes that are still in the nodes list of this previous elected master node can get these cluster state updates. This commit makes sure that this dated cluster states are not accepted by these nodes. This issue can temporary lead to the fact that non elected master nodes switch to the previous elected master node. The new elected master node also gets the same dated cluster state, but rejects it and tells the previous elected master node to step down and rejoin. Because the new elected master is the only master node the previous elected master node will follow the new elected master node. Any nodes that follow the previous elected master node (by accident), will also rejoin and follow the new elected master node because their master fault detection will fail. So all in all this isn't a severe problem, because the problem fixes itself eventually. Closes #9632
This commit is contained in:
parent
7d3856c9d3
commit
4fddda307f
|
@ -47,6 +47,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.inject.internal.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
|
@ -69,6 +70,7 @@ import org.elasticsearch.transport.*;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
@ -744,10 +746,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
|
||||
processNewClusterStates.add(processClusterState);
|
||||
|
||||
|
||||
assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
|
||||
assert !newClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";
|
||||
|
||||
ClusterState currentState = clusterService.state();
|
||||
if (shouldIgnoreNewClusterState(logger, currentState, newClusterState)) {
|
||||
return;
|
||||
}
|
||||
|
||||
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
|
@ -761,49 +767,18 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
// to figure out if we need to use it or not, and only once we picked the latest one, parse the whole state
|
||||
|
||||
|
||||
// try and get the state with the highest version out of all the ones with the same master node id
|
||||
ProcessClusterState stateToProcess = processNewClusterStates.poll();
|
||||
if (stateToProcess == null) {
|
||||
return currentState;
|
||||
ClusterState updatedState = selectNextStateToProcess(processNewClusterStates);
|
||||
if (updatedState == null) {
|
||||
updatedState = currentState;
|
||||
}
|
||||
stateToProcess.processed = true;
|
||||
while (true) {
|
||||
ProcessClusterState potentialState = processNewClusterStates.peek();
|
||||
// nothing else in the queue, bail
|
||||
if (potentialState == null) {
|
||||
break;
|
||||
}
|
||||
// if its not from the same master, then bail
|
||||
if (!Objects.equal(stateToProcess.clusterState.nodes().masterNodeId(), potentialState.clusterState.nodes().masterNodeId())) {
|
||||
break;
|
||||
}
|
||||
|
||||
// we are going to use it for sure, poll (remove) it
|
||||
potentialState = processNewClusterStates.poll();
|
||||
if (potentialState == null) {
|
||||
// might happen if the queue is drained
|
||||
break;
|
||||
}
|
||||
|
||||
potentialState.processed = true;
|
||||
|
||||
if (potentialState.clusterState.version() > stateToProcess.clusterState.version()) {
|
||||
// we found a new one
|
||||
stateToProcess = potentialState;
|
||||
}
|
||||
}
|
||||
|
||||
ClusterState updatedState = stateToProcess.clusterState;
|
||||
|
||||
// if the new state has a smaller version, and it has the same master node, then no need to process it
|
||||
if (updatedState.version() < currentState.version() && Objects.equal(updatedState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
|
||||
if (shouldIgnoreNewClusterState(logger, currentState, updatedState)) {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
// we don't need to do this, since we ping the master, and get notified when it has moved from being a master
|
||||
// because it doesn't have enough master nodes...
|
||||
//if (!electMaster.hasEnoughMasterNodes(newState.nodes())) {
|
||||
// return disconnectFromCluster(newState, "not enough master nodes on new cluster state received from [" + newState.nodes().masterNode() + "]");
|
||||
// return disconnectFromCluster(newState, "not enough master nodes on new cluster state wreceived from [" + newState.nodes().masterNode() + "]");
|
||||
//}
|
||||
|
||||
// check to see that we monitor the correct master of the cluster
|
||||
|
@ -866,6 +841,65 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Picks the cluster state with highest version with the same master from the queue. All cluster states with
|
||||
* lower versions are ignored. If a cluster state with a different master is seen the processing logic stops and the
|
||||
* last processed state is returned.
|
||||
*/
|
||||
static ClusterState selectNextStateToProcess(Queue<ProcessClusterState> processNewClusterStates) {
|
||||
// try and get the state with the highest version out of all the ones with the same master node id
|
||||
ProcessClusterState stateToProcess = processNewClusterStates.poll();
|
||||
if (stateToProcess == null) {
|
||||
return null;
|
||||
}
|
||||
stateToProcess.processed = true;
|
||||
while (true) {
|
||||
ProcessClusterState potentialState = processNewClusterStates.peek();
|
||||
// nothing else in the queue, bail
|
||||
if (potentialState == null) {
|
||||
break;
|
||||
}
|
||||
// if its not from the same master, then bail
|
||||
if (!Objects.equal(stateToProcess.clusterState.nodes().masterNodeId(), potentialState.clusterState.nodes().masterNodeId())) {
|
||||
break;
|
||||
}
|
||||
// we are going to use it for sure, poll (remove) it
|
||||
potentialState = processNewClusterStates.poll();
|
||||
if (potentialState == null) {
|
||||
// might happen if the queue is drained
|
||||
break;
|
||||
}
|
||||
potentialState.processed = true;
|
||||
|
||||
if (potentialState.clusterState.version() > stateToProcess.clusterState.version()) {
|
||||
// we found a new one
|
||||
stateToProcess = potentialState;
|
||||
}
|
||||
}
|
||||
return stateToProcess.clusterState;
|
||||
}
|
||||
|
||||
/**
|
||||
* In the case we follow an elected master the new cluster state needs to have the same elected master and
|
||||
* the new cluster state version needs to be equal or higher than our cluster state version. If either conditions
|
||||
* are true then the cluster state is dated and we should ignore it.
|
||||
*/
|
||||
static boolean shouldIgnoreNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
|
||||
if (currentState.nodes().masterNodeId() == null) {
|
||||
return false;
|
||||
}
|
||||
if (!currentState.nodes().masterNodeId().equals(newClusterState.nodes().masterNodeId())) {
|
||||
logger.warn("received a cluster state from a different master then the current one, ignoring (received {}, current {})", newClusterState.nodes().masterNode(), currentState.nodes().masterNode());
|
||||
return true;
|
||||
} else if (newClusterState.version() < currentState.version()) {
|
||||
// if the new state has a smaller version, and it has the same master node, then no need to process it
|
||||
logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
|
||||
|
||||
if (!transportService.addressSupported(node.address().getClass())) {
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.discovery.zen.fd;
|
||||
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
|
|
@ -27,8 +27,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -38,6 +37,7 @@ import org.elasticsearch.cluster.routing.DjbHashFunction;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -557,7 +557,7 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
|
|||
|
||||
String oldMasterNode = internalCluster().getMasterName();
|
||||
// a very long GC, but it's OK as we remove the disruption when it has had an effect
|
||||
SingleNodeDisruption masterNodeDisruption = new LongGCDisruption(oldMasterNode, getRandom(), 100, 200, 30000, 60000);
|
||||
SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(oldMasterNode, getRandom(), 100, 200, 30000, 60000);
|
||||
internalCluster().setDisruptionScheme(masterNodeDisruption);
|
||||
masterNodeDisruption.startDisrupting();
|
||||
|
||||
|
@ -575,14 +575,7 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
|
|||
ensureStableCluster(2, oldNonMasterNodes.get(0));
|
||||
|
||||
logger.info("waiting for any pinging to stop");
|
||||
for (final String node : oldNonMasterNodes) {
|
||||
assertTrue("node [" + node + "] is still joining master", awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object input) {
|
||||
return !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster();
|
||||
}
|
||||
}, 30, TimeUnit.SECONDS));
|
||||
}
|
||||
assertDiscoveryCompleted(oldNonMasterNodes);
|
||||
|
||||
// restore GC
|
||||
masterNodeDisruption.stopDisrupting();
|
||||
|
@ -595,6 +588,99 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
|
|||
assertMaster(newMaster, nodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that emulates a frozen elected master node that unfreezes and pushes his cluster state to other nodes
|
||||
* that already are following another elected master node. These nodes should reject this cluster state and prevent
|
||||
* them from following the stale master.
|
||||
*/
|
||||
@Test
|
||||
public void testStaleMasterNotHijackingMajority() throws Exception {
|
||||
// TODO: on mac OS multicast threads are shared between nodes and we therefore we can't simulate GC and stop pinging for just one node
|
||||
// find a way to block thread creation in the generic thread pool to avoid this.
|
||||
// 3 node cluster with unicast discovery and minimum_master_nodes set to 2:
|
||||
List<String> nodes = startUnicastCluster(3, null, 2);
|
||||
|
||||
// Save the current master node as old master node, because that node will get frozen
|
||||
final String oldMasterNode = internalCluster().getMasterName();
|
||||
for (String node : nodes) {
|
||||
ensureStableCluster(3, node);
|
||||
}
|
||||
assertMaster(oldMasterNode, nodes);
|
||||
|
||||
// Simulating a painful gc by suspending all threads for a long time on the current elected master node.
|
||||
SingleNodeDisruption masterNodeDisruption = new LongGCDisruption(getRandom(), oldMasterNode);
|
||||
|
||||
// Save the majority side
|
||||
final List<String> majoritySide = new ArrayList<>(nodes);
|
||||
majoritySide.remove(oldMasterNode);
|
||||
|
||||
// Keeps track of the previous and current master when a master node transition took place on each node on the majority side:
|
||||
final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<String, List<Tuple<String, String>>>());
|
||||
for (final String node : majoritySide) {
|
||||
masters.put(node, new ArrayList<Tuple<String, String>>());
|
||||
internalCluster().getInstance(ClusterService.class, node).add(new ClusterStateListener() {
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
DiscoveryNode previousMaster = event.previousState().nodes().getMasterNode();
|
||||
DiscoveryNode currentMaster = event.state().nodes().getMasterNode();
|
||||
if (!Objects.equals(previousMaster, currentMaster)) {
|
||||
String previousMasterNodeName = previousMaster != null ? previousMaster.name() : null;
|
||||
String currentMasterNodeName = currentMaster != null ? currentMaster.name() : null;
|
||||
masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
internalCluster().setDisruptionScheme(masterNodeDisruption);
|
||||
logger.info("freezing node [{}]", oldMasterNode);
|
||||
masterNodeDisruption.startDisrupting();
|
||||
|
||||
// Wait for the majority side to get stable
|
||||
ensureStableCluster(2, majoritySide.get(0));
|
||||
ensureStableCluster(2, majoritySide.get(1));
|
||||
|
||||
// The old master node is frozen, but here we submit a cluster state update task that doesn't get executed,
|
||||
// but will be queued and once the old master node un-freezes it gets executed.
|
||||
// The old master node will send this update + the cluster state where he is flagged as master to the other
|
||||
// nodes that follow the new master. These nodes should ignore this update.
|
||||
internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return ClusterState.builder(currentState).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Throwable t) {
|
||||
logger.warn("failure [{}]", t, source);
|
||||
}
|
||||
});
|
||||
|
||||
// Save the new elected master node
|
||||
final String newMasterNode = internalCluster().getMasterName(majoritySide.get(0));
|
||||
logger.info("new detected master node [{}]", newMasterNode);
|
||||
|
||||
// Stop disruption
|
||||
logger.info("Unfreeze node [{}]", oldMasterNode);
|
||||
masterNodeDisruption.stopDisrupting();
|
||||
|
||||
// Make sure that the end state is consistent on all nodes:
|
||||
assertDiscoveryCompleted(nodes);
|
||||
assertMaster(newMasterNode, nodes);
|
||||
|
||||
|
||||
assertThat(masters.size(), equalTo(2));
|
||||
for (Map.Entry<String, List<Tuple<String, String>>> entry : masters.entrySet()) {
|
||||
String nodeName = entry.getKey();
|
||||
List<Tuple<String, String>> recordedMasterTransition = entry.getValue();
|
||||
assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(), equalTo(2));
|
||||
assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(), equalTo(oldMasterNode));
|
||||
assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition.get(0).v2(), nullValue());
|
||||
assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(), nullValue());
|
||||
assertThat("[" + nodeName + "] jSecond transition's current master should be [" + newMasterNode + "]", recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that a document which is indexed on the majority side of a partition, is available from the minory side,
|
||||
* once the partition is healed
|
||||
|
@ -962,4 +1048,15 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
|
|||
assertThat("wrong master on node [" + node + "]. " + failMsgSuffix, state.nodes().masterNode().name(), equalTo(masterNode));
|
||||
}
|
||||
}
|
||||
|
||||
private void assertDiscoveryCompleted(List<String> nodes) throws InterruptedException {
|
||||
for (final String node : nodes) {
|
||||
assertTrue("node [" + node + "] is still joining master", awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object input) {
|
||||
return !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster();
|
||||
}
|
||||
}, 30, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
|
||||
import static org.elasticsearch.discovery.zen.ZenDiscovery.ProcessClusterState;
|
||||
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreNewClusterState;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.hamcrest.core.IsNull.nullValue;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ZenDiscoveryUnitTest extends ElasticsearchTestCase {
|
||||
|
||||
public void testShouldIgnoreNewClusterState() {
|
||||
ClusterName clusterName = new ClusterName("abc");
|
||||
|
||||
DiscoveryNodes.Builder currentNodes = DiscoveryNodes.builder();
|
||||
currentNodes.masterNodeId("a");
|
||||
DiscoveryNodes.Builder newNodes = DiscoveryNodes.builder();
|
||||
newNodes.masterNodeId("a");
|
||||
|
||||
ClusterState.Builder currentState = ClusterState.builder(clusterName);
|
||||
currentState.nodes(currentNodes);
|
||||
ClusterState.Builder newState = ClusterState.builder(clusterName);
|
||||
newState.nodes(newNodes);
|
||||
|
||||
currentState.version(2);
|
||||
newState.version(1);
|
||||
assertTrue("should ignore, because new state's version is lower to current state's version", shouldIgnoreNewClusterState(logger, currentState.build(), newState.build()));
|
||||
currentState.version(1);
|
||||
newState.version(1);
|
||||
assertFalse("should not ignore, because new state's version is equal to current state's version", shouldIgnoreNewClusterState(logger, currentState.build(), newState.build()));
|
||||
currentState.version(1);
|
||||
newState.version(2);
|
||||
assertFalse("should not ignore, because new state's version is higher to current state's version", shouldIgnoreNewClusterState(logger, currentState.build(), newState.build()));
|
||||
|
||||
currentNodes = DiscoveryNodes.builder();
|
||||
currentNodes.masterNodeId("b");
|
||||
// version isn't taken into account, so randomize it to ensure this.
|
||||
if (randomBoolean()) {
|
||||
currentState.version(2);
|
||||
newState.version(1);
|
||||
} else {
|
||||
currentState.version(1);
|
||||
newState.version(2);
|
||||
}
|
||||
currentState.nodes(currentNodes);
|
||||
assertTrue("should ignore, because current state's master is not equal to new state's master", shouldIgnoreNewClusterState(logger, currentState.build(), newState.build()));
|
||||
|
||||
currentNodes = DiscoveryNodes.builder();
|
||||
currentNodes.masterNodeId(null);
|
||||
currentState.nodes(currentNodes);
|
||||
// version isn't taken into account, so randomize it to ensure this.
|
||||
if (randomBoolean()) {
|
||||
currentState.version(2);
|
||||
newState.version(1);
|
||||
} else {
|
||||
currentState.version(1);
|
||||
newState.version(2);
|
||||
}
|
||||
assertFalse("should not ignore, because current state doesn't have a master", shouldIgnoreNewClusterState(logger, currentState.build(), newState.build()));
|
||||
}
|
||||
|
||||
public void testSelectNextStateToProcess_empty() {
|
||||
Queue<ProcessClusterState> queue = new LinkedList<>();
|
||||
assertThat(ZenDiscovery.selectNextStateToProcess(queue), nullValue());
|
||||
}
|
||||
|
||||
public void testSelectNextStateToProcess() {
|
||||
ClusterName clusterName = new ClusterName("abc");
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder().masterNodeId("a").build();
|
||||
|
||||
int numUpdates = scaledRandomIntBetween(50, 100);
|
||||
LinkedList<ProcessClusterState> queue = new LinkedList<>();
|
||||
for (int i = 0; i < numUpdates; i++) {
|
||||
queue.add(new ProcessClusterState(ClusterState.builder(clusterName).version(i).nodes(nodes).build(), null));
|
||||
}
|
||||
ProcessClusterState mostRecent = queue.get(numUpdates - 1);
|
||||
Collections.shuffle(queue, getRandom());
|
||||
|
||||
assertThat(ZenDiscovery.selectNextStateToProcess(queue), sameInstance(mostRecent.clusterState));
|
||||
assertThat(mostRecent.processed, is(true));
|
||||
assertThat(queue.size(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testSelectNextStateToProcess_differentMasters() {
|
||||
ClusterName clusterName = new ClusterName("abc");
|
||||
DiscoveryNodes nodes1 = DiscoveryNodes.builder().masterNodeId("a").build();
|
||||
DiscoveryNodes nodes2 = DiscoveryNodes.builder().masterNodeId("b").build();
|
||||
|
||||
LinkedList<ProcessClusterState> queue = new LinkedList<>();
|
||||
ProcessClusterState thirdMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(1).nodes(nodes1).build(), null);
|
||||
queue.offer(thirdMostRecent);
|
||||
ProcessClusterState secondMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(2).nodes(nodes1).build(), null);
|
||||
queue.offer(secondMostRecent);
|
||||
ProcessClusterState mostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(3).nodes(nodes1).build(), null);
|
||||
queue.offer(mostRecent);
|
||||
Collections.shuffle(queue, getRandom());
|
||||
queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(4).nodes(nodes2).build(), null));
|
||||
queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(5).nodes(nodes1).build(), null));
|
||||
|
||||
|
||||
assertThat(ZenDiscovery.selectNextStateToProcess(queue), sameInstance(mostRecent.clusterState));
|
||||
assertThat(thirdMostRecent.processed, is(true));
|
||||
assertThat(secondMostRecent.processed, is(true));
|
||||
assertThat(mostRecent.processed, is(true));
|
||||
assertThat(queue.size(), equalTo(2));
|
||||
assertThat(queue.get(0).processed, is(false));
|
||||
assertThat(queue.get(1).processed, is(false));
|
||||
}
|
||||
|
||||
}
|
|
@ -51,6 +51,7 @@ import org.elasticsearch.cluster.routing.OperationRouting;
|
|||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
|
@ -1290,11 +1291,20 @@ public final class InternalTestCluster extends TestCluster {
|
|||
|
||||
|
||||
/**
|
||||
* get the name of the current master node
|
||||
* Returns the name of the current master node in the cluster.
|
||||
*/
|
||||
public String getMasterName() {
|
||||
return getMasterName(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name of the current master node in the cluster and executes the request via the node specified
|
||||
* in the viaNode parameter. If viaNode isn't specified a random node will be picked to the send the request to.
|
||||
*/
|
||||
public String getMasterName(@Nullable String viaNode) {
|
||||
try {
|
||||
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
|
||||
Client client = viaNode != null ? client(viaNode) : client();
|
||||
ClusterState state = client.admin().cluster().prepareState().execute().actionGet().getState();
|
||||
return state.nodes().masterNode().name();
|
||||
} catch (Throwable e) {
|
||||
logger.warn("Can't fetch cluster state", e);
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.test.disruption;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Simulates irregular long gc intervals.
|
||||
*/
|
||||
public class IntermittentLongGCDisruption extends LongGCDisruption {
|
||||
|
||||
volatile boolean disrupting;
|
||||
volatile Thread worker;
|
||||
|
||||
final long intervalBetweenDelaysMin;
|
||||
final long intervalBetweenDelaysMax;
|
||||
final long delayDurationMin;
|
||||
final long delayDurationMax;
|
||||
|
||||
|
||||
public IntermittentLongGCDisruption(Random random) {
|
||||
this(null, random);
|
||||
}
|
||||
|
||||
public IntermittentLongGCDisruption(String disruptedNode, Random random) {
|
||||
this(disruptedNode, random, 100, 200, 300, 20000);
|
||||
}
|
||||
|
||||
public IntermittentLongGCDisruption(String disruptedNode, Random random, long intervalBetweenDelaysMin,
|
||||
long intervalBetweenDelaysMax, long delayDurationMin, long delayDurationMax) {
|
||||
this(random, disruptedNode, intervalBetweenDelaysMin, intervalBetweenDelaysMax, delayDurationMin, delayDurationMax);
|
||||
}
|
||||
|
||||
public IntermittentLongGCDisruption(Random random, String disruptedNode, long intervalBetweenDelaysMin, long intervalBetweenDelaysMax,
|
||||
long delayDurationMin, long delayDurationMax) {
|
||||
super(random, disruptedNode);
|
||||
this.intervalBetweenDelaysMin = intervalBetweenDelaysMin;
|
||||
this.intervalBetweenDelaysMax = intervalBetweenDelaysMax;
|
||||
this.delayDurationMin = delayDurationMin;
|
||||
this.delayDurationMax = delayDurationMax;
|
||||
}
|
||||
|
||||
final static AtomicInteger thread_ids = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public void startDisrupting() {
|
||||
disrupting = true;
|
||||
worker = new Thread(new BackgroundWorker(), "long_gc_simulation_" + thread_ids.incrementAndGet());
|
||||
worker.setDaemon(true);
|
||||
worker.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopDisrupting() {
|
||||
if (worker == null) {
|
||||
return;
|
||||
}
|
||||
logger.info("stopping long GCs on [{}]", disruptedNode);
|
||||
disrupting = false;
|
||||
worker.interrupt();
|
||||
try {
|
||||
worker.join(2 * (intervalBetweenDelaysMax + delayDurationMax));
|
||||
} catch (InterruptedException e) {
|
||||
logger.info("background thread failed to stop");
|
||||
}
|
||||
worker = null;
|
||||
}
|
||||
|
||||
private void simulateLongGC(final TimeValue duration) throws InterruptedException {
|
||||
final String disruptionNodeCopy = disruptedNode;
|
||||
if (disruptionNodeCopy == null) {
|
||||
return;
|
||||
}
|
||||
logger.info("node [{}] goes into GC for for [{}]", disruptionNodeCopy, duration);
|
||||
final Set<Thread> nodeThreads = new HashSet<>();
|
||||
try {
|
||||
while (stopNodeThreads(disruptionNodeCopy, nodeThreads)) ;
|
||||
if (!nodeThreads.isEmpty()) {
|
||||
Thread.sleep(duration.millis());
|
||||
}
|
||||
} finally {
|
||||
logger.info("node [{}] resumes from GC", disruptionNodeCopy);
|
||||
resumeThreads(nodeThreads);
|
||||
}
|
||||
}
|
||||
|
||||
class BackgroundWorker implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (disrupting && disruptedNode != null) {
|
||||
try {
|
||||
TimeValue duration = new TimeValue(delayDurationMin + random.nextInt((int) (delayDurationMax - delayDurationMin)));
|
||||
simulateLongGC(duration);
|
||||
|
||||
duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin)));
|
||||
if (disrupting && disruptedNode != null) {
|
||||
Thread.sleep(duration.millis());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
} catch (Exception e) {
|
||||
logger.error("error in background worker", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -16,6 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.disruption;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -23,76 +24,50 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Suspends all threads on the specified node in order to simulate a long gc.
|
||||
*/
|
||||
public class LongGCDisruption extends SingleNodeDisruption {
|
||||
|
||||
volatile boolean disrupting;
|
||||
volatile Thread worker;
|
||||
|
||||
final long intervalBetweenDelaysMin;
|
||||
final long intervalBetweenDelaysMax;
|
||||
final long delayDurationMin;
|
||||
final long delayDurationMax;
|
||||
|
||||
|
||||
public LongGCDisruption(Random random) {
|
||||
this(null, random);
|
||||
}
|
||||
|
||||
public LongGCDisruption(String disruptedNode, Random random) {
|
||||
this(disruptedNode, random, 100, 200, 300, 20000);
|
||||
}
|
||||
|
||||
public LongGCDisruption(String disruptedNode, Random random, long intervalBetweenDelaysMin,
|
||||
long intervalBetweenDelaysMax, long delayDurationMin, long delayDurationMax) {
|
||||
this(random, intervalBetweenDelaysMin, intervalBetweenDelaysMax, delayDurationMin, delayDurationMax);
|
||||
this.disruptedNode = disruptedNode;
|
||||
}
|
||||
|
||||
public LongGCDisruption(Random random,
|
||||
long intervalBetweenDelaysMin, long intervalBetweenDelaysMax, long delayDurationMin,
|
||||
long delayDurationMax) {
|
||||
super(random);
|
||||
this.intervalBetweenDelaysMin = intervalBetweenDelaysMin;
|
||||
this.intervalBetweenDelaysMax = intervalBetweenDelaysMax;
|
||||
this.delayDurationMin = delayDurationMin;
|
||||
this.delayDurationMax = delayDurationMax;
|
||||
}
|
||||
|
||||
final static AtomicInteger thread_ids = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public void startDisrupting() {
|
||||
disrupting = true;
|
||||
worker = new Thread(new BackgroundWorker(), "long_gc_simulation_" + thread_ids.incrementAndGet());
|
||||
worker.setDaemon(true);
|
||||
worker.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopDisrupting() {
|
||||
if (worker == null) {
|
||||
return;
|
||||
}
|
||||
logger.info("stopping long GCs on [{}]", disruptedNode);
|
||||
disrupting = false;
|
||||
worker.interrupt();
|
||||
try {
|
||||
worker.join(2 * (intervalBetweenDelaysMax + delayDurationMax));
|
||||
} catch (InterruptedException e) {
|
||||
logger.info("background thread failed to stop");
|
||||
}
|
||||
worker = null;
|
||||
}
|
||||
|
||||
final static Pattern[] unsafeClasses = new Pattern[]{
|
||||
private final static Pattern[] unsafeClasses = new Pattern[]{
|
||||
// logging has shared JVM locks - we may suspend a thread and block other nodes from doing their thing
|
||||
Pattern.compile("Logger")
|
||||
};
|
||||
|
||||
private boolean stopNodeThreads(String node, Set<Thread> nodeThreads) {
|
||||
protected final String disruptedNode;
|
||||
private Set<Thread> suspendedThreads;
|
||||
|
||||
public LongGCDisruption(Random random, String disruptedNode) {
|
||||
super(random);
|
||||
this.disruptedNode = disruptedNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void startDisrupting() {
|
||||
if (suspendedThreads == null) {
|
||||
suspendedThreads = new HashSet<>();
|
||||
stopNodeThreads(disruptedNode, suspendedThreads);
|
||||
} else {
|
||||
throw new IllegalStateException("can't disrupt twice, call stopDisrupting() first");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stopDisrupting() {
|
||||
if (suspendedThreads != null) {
|
||||
resumeThreads(suspendedThreads);
|
||||
suspendedThreads = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue expectedTimeToHeal() {
|
||||
return TimeValue.timeValueMillis(0);
|
||||
}
|
||||
|
||||
protected boolean stopNodeThreads(String node, Set<Thread> nodeThreads) {
|
||||
Set<Thread> allThreadsSet = Thread.getAllStackTraces().keySet();
|
||||
boolean stopped = false;
|
||||
final String nodeThreadNamePart = "[" + node + "]";
|
||||
|
@ -124,54 +99,9 @@ public class LongGCDisruption extends SingleNodeDisruption {
|
|||
return stopped;
|
||||
}
|
||||
|
||||
private void resumeThreads(Set<Thread> threads) {
|
||||
protected void resumeThreads(Set<Thread> threads) {
|
||||
for (Thread thread : threads) {
|
||||
thread.resume();
|
||||
}
|
||||
}
|
||||
|
||||
private void simulateLongGC(final TimeValue duration) throws InterruptedException {
|
||||
final String disruptionNodeCopy = disruptedNode;
|
||||
if (disruptionNodeCopy == null) {
|
||||
return;
|
||||
}
|
||||
logger.info("node [{}] goes into GC for for [{}]", disruptionNodeCopy, duration);
|
||||
final Set<Thread> nodeThreads = new HashSet<>();
|
||||
try {
|
||||
while (stopNodeThreads(disruptionNodeCopy, nodeThreads)) ;
|
||||
if (!nodeThreads.isEmpty()) {
|
||||
Thread.sleep(duration.millis());
|
||||
}
|
||||
} finally {
|
||||
logger.info("node [{}] resumes from GC", disruptionNodeCopy);
|
||||
resumeThreads(nodeThreads);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue expectedTimeToHeal() {
|
||||
return TimeValue.timeValueMillis(0);
|
||||
}
|
||||
|
||||
class BackgroundWorker implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (disrupting && disruptedNode != null) {
|
||||
try {
|
||||
TimeValue duration = new TimeValue(delayDurationMin + random.nextInt((int) (delayDurationMax - delayDurationMin)));
|
||||
simulateLongGC(duration);
|
||||
|
||||
duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin)));
|
||||
if (disrupting && disruptedNode != null) {
|
||||
Thread.sleep(duration.millis());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
} catch (Exception e) {
|
||||
logger.error("error in background worker", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue