Split disruption test suite
The disruption tests sit in a single test suite which causes these tests to be single-threaded. We can split this test suite into multiple suites (logically, of course) enabling them to be run in parallel reducing the total run time of all integration tests in core. This commit splits the discovery with service disruptions test suite into three suites - master disruptions - discovery disruptions - cluster disruptions The last one could probably be better named, it is meant to represent performing actions in the cluster (indexing, failing a shard, etc.) while a disruption is taking place. Relates #24662
This commit is contained in:
parent
fdb6cd8088
commit
bd5aee8cfa
|
@ -554,7 +554,6 @@
|
|||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]support[/\\]filtering[/\\]FilterPathGeneratorFilteringTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]deps[/\\]joda[/\\]SimpleJodaTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]BlockingClusterStatePublishResponseHandlerTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryWithServiceDisruptionsIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]zen[/\\]ZenDiscoveryUnitTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]document[/\\]DocumentActionsIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]EnvironmentTests.java" checks="LineLength" />
|
||||
|
|
|
@ -0,0 +1,300 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.FaultDetection;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
|
||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.DisruptedLinks;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
|
||||
|
||||
static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.
|
||||
|
||||
private ClusterDiscoveryConfiguration discoveryConfig;
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(discoveryConfig.nodeSettings(nodeOrdinal))
|
||||
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void clearConfig() {
|
||||
discoveryConfig = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int numberOfShards() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int numberOfReplicas() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
private boolean disableBeforeIndexDeletion;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
disableBeforeIndexDeletion = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
|
||||
if (scheme instanceof NetworkDisruption &&
|
||||
((NetworkDisruption) scheme).getNetworkLinkDisruptionType() instanceof NetworkDisruption.NetworkUnresponsive) {
|
||||
// the network unresponsive disruption may leave operations in flight
|
||||
// this is because this disruption scheme swallows requests by design
|
||||
// as such, these operations will never be marked as finished
|
||||
disableBeforeIndexDeletion = true;
|
||||
}
|
||||
super.setDisruptionScheme(scheme);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeIndexDeletion() throws Exception {
|
||||
if (disableBeforeIndexDeletion == false) {
|
||||
super.beforeIndexDeletion();
|
||||
}
|
||||
}
|
||||
|
||||
List<String> startCluster(int numberOfNodes) throws ExecutionException, InterruptedException {
|
||||
return startCluster(numberOfNodes, -1);
|
||||
}
|
||||
|
||||
List<String> startCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException {
|
||||
return startCluster(numberOfNodes, minimumMasterNode, null);
|
||||
}
|
||||
|
||||
List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws
|
||||
ExecutionException, InterruptedException {
|
||||
configureCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
|
||||
List<String> nodes = internalCluster().startNodes(numberOfNodes);
|
||||
ensureStableCluster(numberOfNodes);
|
||||
|
||||
// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
|
||||
ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
|
||||
if (zenPing instanceof UnicastZenPing) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
return nodes;
|
||||
}
|
||||
|
||||
static final Settings DEFAULT_SETTINGS = Settings.builder()
|
||||
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
|
||||
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
|
||||
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
|
||||
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
|
||||
.put(TcpTransport.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this
|
||||
// value and the time of disruption and does not recover immediately
|
||||
// when disruption is stop. We should make sure we recover faster
|
||||
// then the default of 30s, causing ensureGreen and friends to time out
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(MockTransportService.TestPlugin.class);
|
||||
}
|
||||
|
||||
void configureCluster(
|
||||
int numberOfNodes,
|
||||
@Nullable int[] unicastHostsOrdinals,
|
||||
int minimumMasterNode
|
||||
) throws ExecutionException, InterruptedException {
|
||||
configureCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
|
||||
}
|
||||
|
||||
void configureCluster(
|
||||
Settings settings,
|
||||
int numberOfNodes,
|
||||
@Nullable int[] unicastHostsOrdinals,
|
||||
int minimumMasterNode
|
||||
) throws ExecutionException, InterruptedException {
|
||||
if (minimumMasterNode < 0) {
|
||||
minimumMasterNode = numberOfNodes / 2 + 1;
|
||||
}
|
||||
logger.info("---> configured unicast");
|
||||
// TODO: Rarely use default settings form some of these
|
||||
Settings nodeSettings = Settings.builder()
|
||||
.put(settings)
|
||||
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes)
|
||||
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode)
|
||||
.build();
|
||||
|
||||
if (discoveryConfig == null) {
|
||||
if (unicastHostsOrdinals == null) {
|
||||
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings);
|
||||
} else {
|
||||
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ClusterState getNodeClusterState(String node) {
|
||||
return client(node).admin().cluster().prepareState().setLocal(true).get().getState();
|
||||
}
|
||||
|
||||
void assertNoMaster(final String node) throws Exception {
|
||||
assertNoMaster(node, null, TimeValue.timeValueSeconds(10));
|
||||
}
|
||||
|
||||
void assertNoMaster(final String node, TimeValue maxWaitTime) throws Exception {
|
||||
assertNoMaster(node, null, maxWaitTime);
|
||||
}
|
||||
|
||||
void assertNoMaster(final String node, @Nullable final ClusterBlock expectedBlocks, TimeValue maxWaitTime) throws Exception {
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ClusterState state = getNodeClusterState(node);
|
||||
final DiscoveryNodes nodes = state.nodes();
|
||||
assertNull("node [" + node + "] still has [" + nodes.getMasterNode() + "] as master", nodes.getMasterNode());
|
||||
if (expectedBlocks != null) {
|
||||
for (ClusterBlockLevel level : expectedBlocks.levels()) {
|
||||
assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
|
||||
(level));
|
||||
}
|
||||
}
|
||||
}
|
||||
}, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception {
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ClusterState state = getNodeClusterState(node);
|
||||
String masterNode = null;
|
||||
if (state.nodes().getMasterNode() != null) {
|
||||
masterNode = state.nodes().getMasterNode().getName();
|
||||
}
|
||||
logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode());
|
||||
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
|
||||
oldMasterNode, not(equalTo(masterNode)));
|
||||
}
|
||||
}, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
void assertMaster(String masterNode, List<String> nodes) throws Exception {
|
||||
assertBusy(() -> {
|
||||
for (String node : nodes) {
|
||||
ClusterState state = getNodeClusterState(node);
|
||||
String failMsgSuffix = "cluster_state:\n" + state;
|
||||
assertThat("wrong node count on [" + node + "]. " + failMsgSuffix, state.nodes().getSize(), equalTo(nodes.size()));
|
||||
String otherMasterNodeName = state.nodes().getMasterNode() != null ? state.nodes().getMasterNode().getName() : null;
|
||||
assertThat("wrong master on node [" + node + "]. " + failMsgSuffix, otherMasterNodeName, equalTo(masterNode));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public ServiceDisruptionScheme addRandomDisruptionScheme() {
|
||||
// TODO: add partial partitions
|
||||
NetworkDisruption p;
|
||||
final DisruptedLinks disruptedLinks;
|
||||
if (randomBoolean()) {
|
||||
disruptedLinks = TwoPartitions.random(random(), internalCluster().getNodeNames());
|
||||
} else {
|
||||
disruptedLinks = Bridge.random(random(), internalCluster().getNodeNames());
|
||||
}
|
||||
final NetworkLinkDisruptionType disruptionType;
|
||||
switch (randomInt(2)) {
|
||||
case 0:
|
||||
disruptionType = new NetworkDisruption.NetworkUnresponsive();
|
||||
break;
|
||||
case 1:
|
||||
disruptionType = new NetworkDisconnect();
|
||||
break;
|
||||
case 2:
|
||||
disruptionType = NetworkDisruption.NetworkDelay.random(random());
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
final ServiceDisruptionScheme scheme;
|
||||
if (rarely()) {
|
||||
scheme = new SlowClusterStateProcessing(random());
|
||||
} else {
|
||||
scheme = new NetworkDisruption(disruptedLinks, disruptionType);
|
||||
}
|
||||
setDisruptionScheme(scheme);
|
||||
return scheme;
|
||||
}
|
||||
|
||||
NetworkDisruption addRandomDisruptionType(TwoPartitions partitions) {
|
||||
final NetworkLinkDisruptionType disruptionType;
|
||||
if (randomBoolean()) {
|
||||
disruptionType = new NetworkDisruption.NetworkUnresponsive();
|
||||
} else {
|
||||
disruptionType = new NetworkDisconnect();
|
||||
}
|
||||
NetworkDisruption partition = new NetworkDisruption(partitions, disruptionType);
|
||||
|
||||
setDisruptionScheme(partition);
|
||||
|
||||
return partition;
|
||||
}
|
||||
|
||||
TwoPartitions isolateNode(String isolatedNode) {
|
||||
Set<String> side1 = new HashSet<>();
|
||||
Set<String> side2 = new HashSet<>(Arrays.asList(internalCluster().getNodeNames()));
|
||||
side1.add(isolatedNode);
|
||||
side2.remove(isolatedNode);
|
||||
|
||||
return new TwoPartitions(side1, side2);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,452 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.NoShardAvailableActionException;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
/**
|
||||
* Tests various cluster operations (e.g., indexing) during disruptions.
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
|
||||
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
|
||||
|
||||
/**
|
||||
* Test that we do not loose document whose indexing request was successful, under a randomly selected disruption scheme
|
||||
* We also collect & report the type of indexing failures that occur.
|
||||
* <p>
|
||||
* This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates
|
||||
*/
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.action.bulk:TRACE,org.elasticsearch.action.get:TRACE,discovery:TRACE," +
|
||||
"org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," +
|
||||
"org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE")
|
||||
public void testAckedIndexing() throws Exception {
|
||||
|
||||
final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5;
|
||||
final String timeout = seconds + "s";
|
||||
|
||||
final List<String> nodes = startCluster(rarely() ? 5 : 3);
|
||||
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
|
||||
));
|
||||
ensureGreen();
|
||||
|
||||
ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme();
|
||||
logger.info("disruption scheme [{}] added", disruptionScheme);
|
||||
|
||||
final ConcurrentHashMap<String, String> ackedDocs = new ConcurrentHashMap<>(); // id -> node sent.
|
||||
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
List<Thread> indexers = new ArrayList<>(nodes.size());
|
||||
List<Semaphore> semaphores = new ArrayList<>(nodes.size());
|
||||
final AtomicInteger idGenerator = new AtomicInteger(0);
|
||||
final AtomicReference<CountDownLatch> countDownLatchRef = new AtomicReference<>();
|
||||
final List<Exception> exceptedExceptions = Collections.synchronizedList(new ArrayList<Exception>());
|
||||
|
||||
logger.info("starting indexers");
|
||||
try {
|
||||
for (final String node : nodes) {
|
||||
final Semaphore semaphore = new Semaphore(0);
|
||||
semaphores.add(semaphore);
|
||||
final Client client = client(node);
|
||||
final String name = "indexer_" + indexers.size();
|
||||
final int numPrimaries = getNumShards("test").numPrimaries;
|
||||
Thread thread = new Thread(() -> {
|
||||
while (!stop.get()) {
|
||||
String id = null;
|
||||
try {
|
||||
if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
|
||||
continue;
|
||||
}
|
||||
logger.info("[{}] Acquired semaphore and it has {} permits left", name, semaphore.availablePermits());
|
||||
try {
|
||||
id = Integer.toString(idGenerator.incrementAndGet());
|
||||
int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
|
||||
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
|
||||
IndexResponse response =
|
||||
client.prepareIndex("test", "type", id)
|
||||
.setSource("{}", XContentType.JSON)
|
||||
.setTimeout(timeout)
|
||||
.get(timeout);
|
||||
assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
|
||||
ackedDocs.put(id, node);
|
||||
logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node);
|
||||
} catch (ElasticsearchException e) {
|
||||
exceptedExceptions.add(e);
|
||||
final String docId = id;
|
||||
logger.trace(
|
||||
(Supplier<?>)
|
||||
() -> new ParameterizedMessage("[{}] failed id [{}] through node [{}]", name, docId, node), e);
|
||||
} finally {
|
||||
countDownLatchRef.get().countDown();
|
||||
logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// fine - semaphore interrupt
|
||||
} catch (AssertionError | Exception e) {
|
||||
logger.info(
|
||||
(Supplier<?>) () -> new ParameterizedMessage("unexpected exception in background thread of [{}]", node),
|
||||
e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
thread.setName(name);
|
||||
thread.start();
|
||||
indexers.add(thread);
|
||||
}
|
||||
|
||||
int docsPerIndexer = randomInt(3);
|
||||
logger.info("indexing {} docs per indexer before partition", docsPerIndexer);
|
||||
countDownLatchRef.set(new CountDownLatch(docsPerIndexer * indexers.size()));
|
||||
for (Semaphore semaphore : semaphores) {
|
||||
semaphore.release(docsPerIndexer);
|
||||
}
|
||||
assertTrue(countDownLatchRef.get().await(1, TimeUnit.MINUTES));
|
||||
|
||||
for (int iter = 1 + randomInt(2); iter > 0; iter--) {
|
||||
logger.info("starting disruptions & indexing (iteration [{}])", iter);
|
||||
disruptionScheme.startDisrupting();
|
||||
|
||||
docsPerIndexer = 1 + randomInt(5);
|
||||
logger.info("indexing {} docs per indexer during partition", docsPerIndexer);
|
||||
countDownLatchRef.set(new CountDownLatch(docsPerIndexer * indexers.size()));
|
||||
Collections.shuffle(semaphores, random());
|
||||
for (Semaphore semaphore : semaphores) {
|
||||
assertThat(semaphore.availablePermits(), equalTo(0));
|
||||
semaphore.release(docsPerIndexer);
|
||||
}
|
||||
logger.info("waiting for indexing requests to complete");
|
||||
assertTrue(countDownLatchRef.get().await(docsPerIndexer * seconds * 1000 + 2000, TimeUnit.MILLISECONDS));
|
||||
|
||||
logger.info("stopping disruption");
|
||||
disruptionScheme.stopDisrupting();
|
||||
for (String node : internalCluster().getNodeNames()) {
|
||||
ensureStableCluster(nodes.size(), TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() +
|
||||
DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
|
||||
}
|
||||
// in case of a bridge partition, shard allocation can fail "index.allocation.max_retries" times if the master
|
||||
// is the super-connected node and recovery source and target are on opposite sides of the bridge
|
||||
if (disruptionScheme instanceof NetworkDisruption &&
|
||||
((NetworkDisruption) disruptionScheme).getDisruptedLinks() instanceof Bridge) {
|
||||
assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true));
|
||||
}
|
||||
ensureGreen("test");
|
||||
|
||||
logger.info("validating successful docs");
|
||||
assertBusy(() -> {
|
||||
for (String node : nodes) {
|
||||
try {
|
||||
logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size());
|
||||
for (String id : ackedDocs.keySet()) {
|
||||
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
|
||||
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
|
||||
}
|
||||
} catch (AssertionError | NoShardAvailableActionException e) {
|
||||
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
|
||||
}
|
||||
}
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
|
||||
logger.info("done validating (iteration [{}])", iter);
|
||||
}
|
||||
} finally {
|
||||
if (exceptedExceptions.size() > 0) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (Exception e : exceptedExceptions) {
|
||||
sb.append("\n").append(e.getMessage());
|
||||
}
|
||||
logger.debug("Indexing exceptions during disruption: {}", sb);
|
||||
}
|
||||
logger.info("shutting down indexers");
|
||||
stop.set(true);
|
||||
for (Thread indexer : indexers) {
|
||||
indexer.interrupt();
|
||||
indexer.join(60000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that a document which is indexed on the majority side of a partition, is available from the minority side,
|
||||
* once the partition is healed
|
||||
*/
|
||||
public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
|
||||
List<String> nodes = startCluster(3);
|
||||
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
||||
)
|
||||
.get());
|
||||
ensureGreen("test");
|
||||
|
||||
nodes = new ArrayList<>(nodes);
|
||||
Collections.shuffle(nodes, random());
|
||||
String isolatedNode = nodes.get(0);
|
||||
String notIsolatedNode = nodes.get(1);
|
||||
|
||||
TwoPartitions partitions = isolateNode(isolatedNode);
|
||||
NetworkDisruption scheme = addRandomDisruptionType(partitions);
|
||||
scheme.startDisrupting();
|
||||
ensureStableCluster(2, notIsolatedNode);
|
||||
assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut());
|
||||
|
||||
|
||||
IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value")
|
||||
.get();
|
||||
assertThat(indexResponse.getVersion(), equalTo(1L));
|
||||
|
||||
logger.info("Verifying if document exists via node[{}]", notIsolatedNode);
|
||||
GetResponse getResponse = internalCluster().client(notIsolatedNode).prepareGet("test", "type", indexResponse.getId())
|
||||
.setPreference("_local")
|
||||
.get();
|
||||
assertThat(getResponse.isExists(), is(true));
|
||||
assertThat(getResponse.getVersion(), equalTo(1L));
|
||||
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
|
||||
|
||||
scheme.stopDisrupting();
|
||||
|
||||
ensureStableCluster(3);
|
||||
ensureGreen("test");
|
||||
|
||||
for (String node : nodes) {
|
||||
logger.info("Verifying if document exists after isolating node[{}] via node[{}]", isolatedNode, node);
|
||||
getResponse = internalCluster().client(node).prepareGet("test", "type", indexResponse.getId())
|
||||
.setPreference("_local")
|
||||
.get();
|
||||
assertThat(getResponse.isExists(), is(true));
|
||||
assertThat(getResponse.getVersion(), equalTo(1L));
|
||||
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
|
||||
}
|
||||
}
|
||||
|
||||
// simulate handling of sending shard failure during an isolation
|
||||
public void testSendingShardFailure() throws Exception {
|
||||
List<String> nodes = startCluster(3, 2);
|
||||
String masterNode = internalCluster().getMasterName();
|
||||
List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
|
||||
String nonMasterNode = randomFrom(nonMasterNodes);
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
||||
));
|
||||
ensureGreen();
|
||||
String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId();
|
||||
|
||||
// fail a random shard
|
||||
ShardRouting failedShard =
|
||||
randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
|
||||
ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicBoolean success = new AtomicBoolean();
|
||||
|
||||
String isolatedNode = randomBoolean() ? masterNode : nonMasterNode;
|
||||
TwoPartitions partitions = isolateNode(isolatedNode);
|
||||
// we cannot use the NetworkUnresponsive disruption type here as it will swallow the "shard failed" request, calling neither
|
||||
// onSuccess nor onFailure on the provided listener.
|
||||
NetworkLinkDisruptionType disruptionType = new NetworkDisconnect();
|
||||
NetworkDisruption networkDisruption = new NetworkDisruption(partitions, disruptionType);
|
||||
setDisruptionScheme(networkDisruption);
|
||||
networkDisruption.startDisrupting();
|
||||
|
||||
service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new
|
||||
ShardStateAction.Listener() {
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
success.set(true);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
success.set(false);
|
||||
latch.countDown();
|
||||
assert false;
|
||||
}
|
||||
});
|
||||
|
||||
if (isolatedNode.equals(nonMasterNode)) {
|
||||
assertNoMaster(nonMasterNode);
|
||||
} else {
|
||||
ensureStableCluster(2, nonMasterNode);
|
||||
}
|
||||
|
||||
// heal the partition
|
||||
networkDisruption.removeAndEnsureHealthy(internalCluster());
|
||||
|
||||
// the cluster should stabilize
|
||||
ensureStableCluster(3);
|
||||
|
||||
latch.await();
|
||||
|
||||
// the listener should be notified
|
||||
assertTrue(success.get());
|
||||
|
||||
// the failed shard should be gone
|
||||
List<ShardRouting> shards = clusterService().state().getRoutingTable().allShards("test");
|
||||
for (ShardRouting shard : shards) {
|
||||
assertThat(shard.allocationId(), not(equalTo(failedShard.allocationId())));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test creates a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the target
|
||||
* node but already deleted on the source node. Search request should still work.
|
||||
*/
|
||||
public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception {
|
||||
// don't use DEFAULT settings (which can cause node disconnects on a slow CI machine)
|
||||
configureCluster(Settings.EMPTY, 3, null, 1);
|
||||
final String masterNode = internalCluster().startMasterOnlyNode();
|
||||
final String node_1 = internalCluster().startDataOnlyNode();
|
||||
|
||||
logger.info("--> creating index [test] with one shard and on replica");
|
||||
assertAcked(prepareCreate("test").setSettings(
|
||||
Settings.builder().put(indexSettings())
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
|
||||
);
|
||||
ensureGreen("test");
|
||||
|
||||
final String node_2 = internalCluster().startDataOnlyNode();
|
||||
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("doc")
|
||||
.setSource("{\"int_field\":1}", XContentType.JSON));
|
||||
}
|
||||
indexRandom(true, indexRequestBuilderList);
|
||||
|
||||
IndicesStoreIntegrationIT.relocateAndBlockCompletion(logger, "test", 0, node_1, node_2);
|
||||
// now search for the documents and see if we get a reply
|
||||
assertThat(client().prepareSearch().setSize(0).get().getHits().getTotalHits(), equalTo(100L));
|
||||
}
|
||||
|
||||
public void testIndexImportedFromDataOnlyNodesIfMasterLostDataFolder() throws Exception {
|
||||
// test for https://github.com/elastic/elasticsearch/issues/8823
|
||||
configureCluster(2, null, 1);
|
||||
String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY);
|
||||
internalCluster().startDataOnlyNode(Settings.EMPTY);
|
||||
|
||||
ensureStableCluster(2);
|
||||
assertAcked(prepareCreate("index").setSettings(Settings.builder().put("index.number_of_replicas", 0)));
|
||||
index("index", "doc", "1", jsonBuilder().startObject().field("text", "some text").endObject());
|
||||
ensureGreen();
|
||||
|
||||
internalCluster().restartNode(masterNode, new InternalTestCluster.RestartCallback() {
|
||||
@Override
|
||||
public boolean clearData(String nodeName) {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
ensureGreen("index");
|
||||
assertTrue(client().prepareGet("index", "doc", "1").get().isExists());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that indices are properly deleted even if there is a master transition in between.
|
||||
* Test for https://github.com/elastic/elasticsearch/issues/11665
|
||||
*/
|
||||
public void testIndicesDeleted() throws Exception {
|
||||
final Settings settings = Settings.builder()
|
||||
.put(DEFAULT_SETTINGS)
|
||||
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node
|
||||
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed
|
||||
.build();
|
||||
final String idxName = "test";
|
||||
configureCluster(settings, 3, null, 2);
|
||||
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2);
|
||||
final String dataNode = internalCluster().startDataOnlyNode();
|
||||
ensureStableCluster(3);
|
||||
assertAcked(prepareCreate("test"));
|
||||
|
||||
final String masterNode1 = internalCluster().getMasterName();
|
||||
NetworkDisruption networkDisruption =
|
||||
new NetworkDisruption(new TwoPartitions(masterNode1, dataNode), new NetworkDisruption.NetworkUnresponsive());
|
||||
internalCluster().setDisruptionScheme(networkDisruption);
|
||||
networkDisruption.startDisrupting();
|
||||
// We know this will time out due to the partition, we check manually below to not proceed until
|
||||
// the delete has been applied to the master node and the master eligible node.
|
||||
internalCluster().client(masterNode1).admin().indices().prepareDelete(idxName).setTimeout("0s").get();
|
||||
// Don't restart the master node until we know the index deletion has taken effect on master and the master eligible node.
|
||||
assertBusy(() -> {
|
||||
for (String masterNode : allMasterEligibleNodes) {
|
||||
final ClusterState masterState = internalCluster().clusterService(masterNode).state();
|
||||
assertTrue("index not deleted on " + masterNode, masterState.metaData().hasIndex(idxName) == false);
|
||||
}
|
||||
});
|
||||
internalCluster().restartNode(masterNode1, InternalTestCluster.EMPTY_CALLBACK);
|
||||
ensureYellow();
|
||||
assertFalse(client().admin().indices().prepareExists(idxName).get().isExists());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,320 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.MembershipAction;
|
||||
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
||||
/**
|
||||
* Tests for discovery during disruptions.
|
||||
*/
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
|
||||
public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
|
||||
|
||||
public void testIsolatedUnicastNodes() throws Exception {
|
||||
List<String> nodes = startCluster(4, -1, new int[]{0});
|
||||
// Figure out what is the elected master node
|
||||
final String unicastTarget = nodes.get(0);
|
||||
|
||||
Set<String> unicastTargetSide = new HashSet<>();
|
||||
unicastTargetSide.add(unicastTarget);
|
||||
|
||||
Set<String> restOfClusterSide = new HashSet<>();
|
||||
restOfClusterSide.addAll(nodes);
|
||||
restOfClusterSide.remove(unicastTarget);
|
||||
|
||||
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
|
||||
// includes all the other nodes that have pinged it and the issue doesn't manifest
|
||||
ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
|
||||
if (zenPing instanceof UnicastZenPing) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
|
||||
// Simulate a network issue between the unicast target node and the rest of the cluster
|
||||
NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(unicastTargetSide, restOfClusterSide),
|
||||
new NetworkDisconnect());
|
||||
setDisruptionScheme(networkDisconnect);
|
||||
networkDisconnect.startDisrupting();
|
||||
// Wait until elected master has removed that the unlucky node...
|
||||
ensureStableCluster(3, nodes.get(1));
|
||||
|
||||
// The isolate master node must report no master, so it starts with pinging
|
||||
assertNoMaster(unicastTarget);
|
||||
networkDisconnect.stopDisrupting();
|
||||
// Wait until the master node sees all 3 nodes again.
|
||||
ensureStableCluster(4);
|
||||
}
|
||||
|
||||
/**
|
||||
* A 4 node cluster with m_m_n set to 3 and each node has one unicast endpoint. One node partitions from the master node.
|
||||
* The temporal unicast responses is empty. When partition is solved the one ping response contains a master node.
|
||||
* The rejoining node should take this master node and connect.
|
||||
*/
|
||||
public void testUnicastSinglePingResponseContainsMaster() throws Exception {
|
||||
List<String> nodes = startCluster(4, -1, new int[]{0});
|
||||
// Figure out what is the elected master node
|
||||
final String masterNode = internalCluster().getMasterName();
|
||||
logger.info("---> legit elected master node={}", masterNode);
|
||||
List<String> otherNodes = new ArrayList<>(nodes);
|
||||
otherNodes.remove(masterNode);
|
||||
otherNodes.remove(nodes.get(0)); // <-- Don't isolate the node that is in the unicast endpoint for all the other nodes.
|
||||
final String isolatedNode = otherNodes.get(0);
|
||||
|
||||
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
|
||||
// includes all the other nodes that have pinged it and the issue doesn't manifest
|
||||
ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing();
|
||||
if (zenPing instanceof UnicastZenPing) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
|
||||
// Simulate a network issue between the unlucky node and elected master node in both directions.
|
||||
NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(masterNode, isolatedNode),
|
||||
new NetworkDisconnect());
|
||||
setDisruptionScheme(networkDisconnect);
|
||||
networkDisconnect.startDisrupting();
|
||||
// Wait until elected master has removed that the unlucky node...
|
||||
ensureStableCluster(3, masterNode);
|
||||
|
||||
// The isolate master node must report no master, so it starts with pinging
|
||||
assertNoMaster(isolatedNode);
|
||||
networkDisconnect.stopDisrupting();
|
||||
// Wait until the master node sees all 4 nodes again.
|
||||
ensureStableCluster(4);
|
||||
// The elected master shouldn't have changed, since the isolated node never could have elected himself as
|
||||
// master since m_m_n of 3 could never be satisfied.
|
||||
assertMaster(masterNode, nodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test cluster join with issues in cluster state publishing *
|
||||
*/
|
||||
public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
|
||||
List<String> nodes = startCluster(2, 1);
|
||||
|
||||
String masterNode = internalCluster().getMasterName();
|
||||
String nonMasterNode;
|
||||
if (masterNode.equals(nodes.get(0))) {
|
||||
nonMasterNode = nodes.get(1);
|
||||
} else {
|
||||
nonMasterNode = nodes.get(0);
|
||||
}
|
||||
|
||||
DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();
|
||||
|
||||
TransportService masterTranspotService =
|
||||
internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName());
|
||||
|
||||
logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode);
|
||||
MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
|
||||
nonMasterNode);
|
||||
nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService);
|
||||
|
||||
assertNoMaster(nonMasterNode);
|
||||
|
||||
logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
|
||||
MockTransportService masterTransportService =
|
||||
(MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
|
||||
TransportService localTransportService =
|
||||
internalCluster().getInstance(TransportService.class, discoveryNodes.getLocalNode().getName());
|
||||
if (randomBoolean()) {
|
||||
masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.SEND_ACTION_NAME);
|
||||
} else {
|
||||
masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.COMMIT_ACTION_NAME);
|
||||
}
|
||||
|
||||
logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(2);
|
||||
nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService
|
||||
.original()) {
|
||||
@Override
|
||||
protected void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException {
|
||||
if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
super.sendRequest(connection, requestId, action, request, options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
|
||||
return super.openConnection(node, profile);
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
countDownLatch.await();
|
||||
|
||||
logger.info("waiting for cluster to reform");
|
||||
masterTransportService.clearRule(localTransportService);
|
||||
nonMasterTransportService.clearRule(localTransportService);
|
||||
|
||||
ensureStableCluster(2);
|
||||
|
||||
// shutting down the nodes, to avoid the leakage check tripping
|
||||
// on the states associated with the commit requests we may have dropped
|
||||
internalCluster().stopRandomNonMasterNode();
|
||||
}
|
||||
|
||||
public void testClusterFormingWithASlowNode() throws Exception {
|
||||
configureCluster(3, null, 2);
|
||||
|
||||
SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);
|
||||
|
||||
// don't wait for initial state, we want to add the disruption while the cluster is forming
|
||||
internalCluster().startNodes(3, Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "3s").build());
|
||||
|
||||
logger.info("applying disruption while cluster is forming ...");
|
||||
|
||||
internalCluster().setDisruptionScheme(disruption);
|
||||
disruption.startDisrupting();
|
||||
|
||||
ensureStableCluster(3);
|
||||
}
|
||||
|
||||
public void testElectMasterWithLatestVersion() throws Exception {
|
||||
configureCluster(3, null, 2);
|
||||
final Set<String> nodes = new HashSet<>(internalCluster().startNodes(3));
|
||||
ensureStableCluster(3);
|
||||
ServiceDisruptionScheme isolateAllNodes =
|
||||
new NetworkDisruption(new NetworkDisruption.IsolateAllNodes(nodes), new NetworkDisconnect());
|
||||
internalCluster().setDisruptionScheme(isolateAllNodes);
|
||||
|
||||
logger.info("--> forcing a complete election to make sure \"preferred\" master is elected");
|
||||
isolateAllNodes.startDisrupting();
|
||||
for (String node : nodes) {
|
||||
assertNoMaster(node);
|
||||
}
|
||||
internalCluster().clearDisruptionScheme();
|
||||
ensureStableCluster(3);
|
||||
final String preferredMasterName = internalCluster().getMasterName();
|
||||
final DiscoveryNode preferredMaster = internalCluster().clusterService(preferredMasterName).localNode();
|
||||
for (String node : nodes) {
|
||||
DiscoveryNode discoveryNode = internalCluster().clusterService(node).localNode();
|
||||
assertThat(discoveryNode.getId(), greaterThanOrEqualTo(preferredMaster.getId()));
|
||||
}
|
||||
|
||||
logger.info("--> preferred master is {}", preferredMaster);
|
||||
final Set<String> nonPreferredNodes = new HashSet<>(nodes);
|
||||
nonPreferredNodes.remove(preferredMasterName);
|
||||
final ServiceDisruptionScheme isolatePreferredMaster =
|
||||
new NetworkDisruption(
|
||||
new NetworkDisruption.TwoPartitions(
|
||||
Collections.singleton(preferredMasterName), nonPreferredNodes),
|
||||
new NetworkDisconnect());
|
||||
internalCluster().setDisruptionScheme(isolatePreferredMaster);
|
||||
isolatePreferredMaster.startDisrupting();
|
||||
|
||||
assertAcked(client(randomFrom(nonPreferredNodes)).admin().indices().prepareCreate("test").setSettings(
|
||||
INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1,
|
||||
INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0
|
||||
));
|
||||
|
||||
internalCluster().clearDisruptionScheme(false);
|
||||
internalCluster().setDisruptionScheme(isolateAllNodes);
|
||||
|
||||
logger.info("--> forcing a complete election again");
|
||||
isolateAllNodes.startDisrupting();
|
||||
for (String node : nodes) {
|
||||
assertNoMaster(node);
|
||||
}
|
||||
|
||||
isolateAllNodes.stopDisrupting();
|
||||
|
||||
final ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||
if (state.metaData().hasIndex("test") == false) {
|
||||
fail("index 'test' was lost. current cluster state: " + state);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an asymmetric break between a master and one of the nodes and makes
|
||||
* sure that the node is removed form the cluster, that the node start pinging and that
|
||||
* the cluster reforms when healed.
|
||||
*/
|
||||
public void testNodeNotReachableFromMaster() throws Exception {
|
||||
startCluster(3);
|
||||
|
||||
String masterNode = internalCluster().getMasterName();
|
||||
String nonMasterNode = null;
|
||||
while (nonMasterNode == null) {
|
||||
nonMasterNode = randomFrom(internalCluster().getNodeNames());
|
||||
if (nonMasterNode.equals(masterNode)) {
|
||||
nonMasterNode = null;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode);
|
||||
MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
|
||||
masterNode);
|
||||
if (randomBoolean()) {
|
||||
masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
|
||||
} else {
|
||||
masterTransportService.addFailToSendNoConnectRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
|
||||
}
|
||||
|
||||
logger.info("waiting for [{}] to be removed from cluster", nonMasterNode);
|
||||
ensureStableCluster(2, masterNode);
|
||||
|
||||
logger.info("waiting for [{}] to have no master", nonMasterNode);
|
||||
assertNoMaster(nonMasterNode);
|
||||
|
||||
logger.info("healing partition and checking cluster reforms");
|
||||
masterTransportService.clearAllRules();
|
||||
|
||||
ensureStableCluster(3);
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,466 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.monitor.jvm.HotThreads;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
|
||||
import org.elasticsearch.test.disruption.LongGCDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.disruption.SingleNodeDisruption;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
/**
|
||||
* Tests relating to the loss of the master.
|
||||
*/
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false)
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE")
|
||||
public class MasterDisruptionIT extends AbstractDisruptionTestCase {
|
||||
|
||||
/**
|
||||
* Test that no split brain occurs under partial network partition. See https://github.com/elastic/elasticsearch/issues/2488
|
||||
*/
|
||||
public void testFailWithMinimumMasterNodesConfigured() throws Exception {
|
||||
List<String> nodes = startCluster(3);
|
||||
|
||||
// Figure out what is the elected master node
|
||||
final String masterNode = internalCluster().getMasterName();
|
||||
logger.info("---> legit elected master node={}", masterNode);
|
||||
|
||||
// Pick a node that isn't the elected master.
|
||||
Set<String> nonMasters = new HashSet<>(nodes);
|
||||
nonMasters.remove(masterNode);
|
||||
final String unluckyNode = randomFrom(nonMasters.toArray(Strings.EMPTY_ARRAY));
|
||||
|
||||
|
||||
// Simulate a network issue between the unlucky node and elected master node in both directions.
|
||||
|
||||
NetworkDisruption networkDisconnect = new NetworkDisruption(
|
||||
new NetworkDisruption.TwoPartitions(masterNode, unluckyNode),
|
||||
new NetworkDisruption.NetworkDisconnect());
|
||||
setDisruptionScheme(networkDisconnect);
|
||||
networkDisconnect.startDisrupting();
|
||||
|
||||
// Wait until elected master has removed that the unlucky node...
|
||||
ensureStableCluster(2, masterNode);
|
||||
|
||||
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
|
||||
// continuously ping until network failures have been resolved. However
|
||||
// It may a take a bit before the node detects it has been cut off from the elected master
|
||||
assertNoMaster(unluckyNode);
|
||||
|
||||
networkDisconnect.stopDisrupting();
|
||||
|
||||
// Wait until the master node sees all 3 nodes again.
|
||||
ensureStableCluster(3);
|
||||
|
||||
// The elected master shouldn't have changed, since the unlucky node never could have elected himself as
|
||||
// master since m_m_n of 2 could never be satisfied.
|
||||
assertMaster(masterNode, nodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that nodes fault detection works after master (re) election
|
||||
*/
|
||||
public void testNodesFDAfterMasterReelection() throws Exception {
|
||||
startCluster(4);
|
||||
|
||||
logger.info("--> stopping current master");
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
|
||||
ensureStableCluster(3);
|
||||
|
||||
logger.info("--> reducing min master nodes to 2");
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2))
|
||||
.get());
|
||||
|
||||
String master = internalCluster().getMasterName();
|
||||
String nonMaster = null;
|
||||
for (String node : internalCluster().getNodeNames()) {
|
||||
if (!node.equals(master)) {
|
||||
nonMaster = node;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("--> isolating [{}]", nonMaster);
|
||||
NetworkDisruption.TwoPartitions partitions = isolateNode(nonMaster);
|
||||
NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
|
||||
networkDisruption.startDisrupting();
|
||||
|
||||
logger.info("--> waiting for master to remove it");
|
||||
ensureStableCluster(2, master);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that emulates a frozen elected master node that unfreezes and pushes his cluster state to other nodes
|
||||
* that already are following another elected master node. These nodes should reject this cluster state and prevent
|
||||
* them from following the stale master.
|
||||
*/
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.test.disruption:TRACE")
|
||||
public void testStaleMasterNotHijackingMajority() throws Exception {
|
||||
// 3 node cluster with unicast discovery and minimum_master_nodes set to 2:
|
||||
final List<String> nodes = startCluster(3, 2);
|
||||
|
||||
// Save the current master node as old master node, because that node will get frozen
|
||||
final String oldMasterNode = internalCluster().getMasterName();
|
||||
for (String node : nodes) {
|
||||
ensureStableCluster(3, node);
|
||||
}
|
||||
assertMaster(oldMasterNode, nodes);
|
||||
|
||||
// Simulating a painful gc by suspending all threads for a long time on the current elected master node.
|
||||
SingleNodeDisruption masterNodeDisruption = new LongGCDisruption(random(), oldMasterNode);
|
||||
|
||||
// Save the majority side
|
||||
final List<String> majoritySide = new ArrayList<>(nodes);
|
||||
majoritySide.remove(oldMasterNode);
|
||||
|
||||
// Keeps track of the previous and current master when a master node transition took place on each node on the majority side:
|
||||
final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<String, List<Tuple<String,
|
||||
String>>>());
|
||||
for (final String node : majoritySide) {
|
||||
masters.put(node, new ArrayList<Tuple<String, String>>());
|
||||
internalCluster().getInstance(ClusterService.class, node).addListener(event -> {
|
||||
DiscoveryNode previousMaster = event.previousState().nodes().getMasterNode();
|
||||
DiscoveryNode currentMaster = event.state().nodes().getMasterNode();
|
||||
if (!Objects.equals(previousMaster, currentMaster)) {
|
||||
logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(),
|
||||
event.previousState());
|
||||
String previousMasterNodeName = previousMaster != null ? previousMaster.getName() : null;
|
||||
String currentMasterNodeName = currentMaster != null ? currentMaster.getName() : null;
|
||||
masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
final CountDownLatch oldMasterNodeSteppedDown = new CountDownLatch(1);
|
||||
internalCluster().getInstance(ClusterService.class, oldMasterNode).addListener(event -> {
|
||||
if (event.state().nodes().getMasterNodeId() == null) {
|
||||
oldMasterNodeSteppedDown.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
internalCluster().setDisruptionScheme(masterNodeDisruption);
|
||||
logger.info("freezing node [{}]", oldMasterNode);
|
||||
masterNodeDisruption.startDisrupting();
|
||||
|
||||
// Wait for the majority side to get stable
|
||||
assertDifferentMaster(majoritySide.get(0), oldMasterNode);
|
||||
assertDifferentMaster(majoritySide.get(1), oldMasterNode);
|
||||
|
||||
// the test is periodically tripping on the following assertion. To find out which threads are blocking the nodes from making
|
||||
// progress we print a stack dump
|
||||
boolean failed = true;
|
||||
try {
|
||||
assertDiscoveryCompleted(majoritySide);
|
||||
failed = false;
|
||||
} finally {
|
||||
if (failed) {
|
||||
logger.error("discovery failed to complete, probably caused by a blocked thread: {}",
|
||||
new HotThreads().busiestThreads(Integer.MAX_VALUE).ignoreIdleThreads(false).detect());
|
||||
}
|
||||
}
|
||||
|
||||
// The old master node is frozen, but here we submit a cluster state update task that doesn't get executed,
|
||||
// but will be queued and once the old master node un-freezes it gets executed.
|
||||
// The old master node will send this update + the cluster state where he is flagged as master to the other
|
||||
// nodes that follow the new master. These nodes should ignore this update.
|
||||
internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new
|
||||
ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
return ClusterState.builder(currentState).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failure [{}]", source), e);
|
||||
}
|
||||
});
|
||||
|
||||
// Save the new elected master node
|
||||
final String newMasterNode = internalCluster().getMasterName(majoritySide.get(0));
|
||||
logger.info("new detected master node [{}]", newMasterNode);
|
||||
|
||||
// Stop disruption
|
||||
logger.info("Unfreeze node [{}]", oldMasterNode);
|
||||
masterNodeDisruption.stopDisrupting();
|
||||
|
||||
oldMasterNodeSteppedDown.await(30, TimeUnit.SECONDS);
|
||||
// Make sure that the end state is consistent on all nodes:
|
||||
assertDiscoveryCompleted(nodes);
|
||||
assertMaster(newMasterNode, nodes);
|
||||
|
||||
assertThat(masters.size(), equalTo(2));
|
||||
for (Map.Entry<String, List<Tuple<String, String>>> entry : masters.entrySet()) {
|
||||
String nodeName = entry.getKey();
|
||||
List<Tuple<String, String>> recordedMasterTransition = entry.getValue();
|
||||
assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(),
|
||||
equalTo(2));
|
||||
assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(),
|
||||
equalTo(oldMasterNode));
|
||||
assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition
|
||||
.get(0).v2(), nullValue());
|
||||
assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(),
|
||||
nullValue());
|
||||
assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]",
|
||||
recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that cluster recovers from a long GC on master that causes other nodes to elect a new one
|
||||
*/
|
||||
public void testMasterNodeGCs() throws Exception {
|
||||
List<String> nodes = startCluster(3, -1);
|
||||
|
||||
String oldMasterNode = internalCluster().getMasterName();
|
||||
// a very long GC, but it's OK as we remove the disruption when it has had an effect
|
||||
SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(random(), oldMasterNode, 100, 200, 30000, 60000);
|
||||
internalCluster().setDisruptionScheme(masterNodeDisruption);
|
||||
masterNodeDisruption.startDisrupting();
|
||||
|
||||
Set<String> oldNonMasterNodesSet = new HashSet<>(nodes);
|
||||
oldNonMasterNodesSet.remove(oldMasterNode);
|
||||
|
||||
List<String> oldNonMasterNodes = new ArrayList<>(oldNonMasterNodesSet);
|
||||
|
||||
logger.info("waiting for nodes to de-elect master [{}]", oldMasterNode);
|
||||
for (String node : oldNonMasterNodesSet) {
|
||||
assertDifferentMaster(node, oldMasterNode);
|
||||
}
|
||||
|
||||
logger.info("waiting for nodes to elect a new master");
|
||||
ensureStableCluster(2, oldNonMasterNodes.get(0));
|
||||
|
||||
logger.info("waiting for any pinging to stop");
|
||||
assertDiscoveryCompleted(oldNonMasterNodes);
|
||||
|
||||
// restore GC
|
||||
masterNodeDisruption.stopDisrupting();
|
||||
final TimeValue waitTime = new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + masterNodeDisruption.expectedTimeToHeal().millis());
|
||||
ensureStableCluster(3, waitTime, false, oldNonMasterNodes.get(0));
|
||||
|
||||
// make sure all nodes agree on master
|
||||
String newMaster = internalCluster().getMasterName();
|
||||
assertThat(newMaster, not(equalTo(oldMasterNode)));
|
||||
assertMaster(newMaster, nodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test isolates the master from rest of the cluster, waits for a new master to be elected, restores the partition
|
||||
* and verifies that all node agree on the new cluster state
|
||||
*/
|
||||
@TestLogging(
|
||||
"_root:DEBUG,"
|
||||
+ "org.elasticsearch.cluster.service:TRACE,"
|
||||
+ "org.elasticsearch.gateway:TRACE,"
|
||||
+ "org.elasticsearch.indices.store:TRACE")
|
||||
public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception {
|
||||
final List<String> nodes = startCluster(3);
|
||||
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
|
||||
));
|
||||
|
||||
ensureGreen();
|
||||
String isolatedNode = internalCluster().getMasterName();
|
||||
TwoPartitions partitions = isolateNode(isolatedNode);
|
||||
NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
|
||||
networkDisruption.startDisrupting();
|
||||
|
||||
String nonIsolatedNode = partitions.getMajoritySide().iterator().next();
|
||||
|
||||
// make sure cluster reforms
|
||||
ensureStableCluster(2, nonIsolatedNode);
|
||||
|
||||
// make sure isolated need picks up on things.
|
||||
assertNoMaster(isolatedNode, TimeValue.timeValueSeconds(40));
|
||||
|
||||
// restore isolation
|
||||
networkDisruption.stopDisrupting();
|
||||
|
||||
for (String node : nodes) {
|
||||
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()),
|
||||
true, node);
|
||||
}
|
||||
|
||||
logger.info("issue a reroute");
|
||||
// trigger a reroute now, instead of waiting for the background reroute of RerouteService
|
||||
assertAcked(client().admin().cluster().prepareReroute());
|
||||
// and wait for it to finish and for the cluster to stabilize
|
||||
ensureGreen("test");
|
||||
|
||||
// verify all cluster states are the same
|
||||
// use assert busy to wait for cluster states to be applied (as publish_timeout has low value)
|
||||
assertBusy(() -> {
|
||||
ClusterState state = null;
|
||||
for (String node : nodes) {
|
||||
ClusterState nodeState = getNodeClusterState(node);
|
||||
if (state == null) {
|
||||
state = nodeState;
|
||||
continue;
|
||||
}
|
||||
// assert nodes are identical
|
||||
try {
|
||||
assertEquals("unequal versions", state.version(), nodeState.version());
|
||||
assertEquals("unequal node count", state.nodes().getSize(), nodeState.nodes().getSize());
|
||||
assertEquals("different masters ", state.nodes().getMasterNodeId(), nodeState.nodes().getMasterNodeId());
|
||||
assertEquals("different meta data version", state.metaData().version(), nodeState.metaData().version());
|
||||
assertEquals("different routing", state.routingTable().toString(), nodeState.routingTable().toString());
|
||||
} catch (AssertionError t) {
|
||||
fail("failed comparing cluster state: " + t.getMessage() + "\n" +
|
||||
"--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state +
|
||||
"\n--- cluster state [" + node + "]: ---\n" + nodeState);
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the proper block is applied when nodes loose their master
|
||||
*/
|
||||
public void testVerifyApiBlocksDuringPartition() throws Exception {
|
||||
startCluster(3);
|
||||
|
||||
// Makes sure that the get request can be executed on each node locally:
|
||||
assertAcked(prepareCreate("test").setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
||||
));
|
||||
|
||||
// Everything is stable now, it is now time to simulate evil...
|
||||
// but first make sure we have no initializing shards and all is green
|
||||
// (waiting for green here, because indexing / search in a yellow index is fine as long as no other nodes go down)
|
||||
ensureGreen("test");
|
||||
|
||||
TwoPartitions partitions = TwoPartitions.random(random(), internalCluster().getNodeNames());
|
||||
NetworkDisruption networkDisruption = addRandomDisruptionType(partitions);
|
||||
|
||||
assertEquals(1, partitions.getMinoritySide().size());
|
||||
final String isolatedNode = partitions.getMinoritySide().iterator().next();
|
||||
assertEquals(2, partitions.getMajoritySide().size());
|
||||
final String nonIsolatedNode = partitions.getMajoritySide().iterator().next();
|
||||
|
||||
// Simulate a network issue between the unlucky node and the rest of the cluster.
|
||||
networkDisruption.startDisrupting();
|
||||
|
||||
|
||||
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
|
||||
// continuously ping until network failures have been resolved. However
|
||||
// It may a take a bit before the node detects it has been cut off from the elected master
|
||||
logger.info("waiting for isolated node [{}] to have no master", isolatedNode);
|
||||
assertNoMaster(isolatedNode, DiscoverySettings.NO_MASTER_BLOCK_WRITES, TimeValue.timeValueSeconds(10));
|
||||
|
||||
|
||||
logger.info("wait until elected master has been removed and a new 2 node cluster was from (via [{}])", isolatedNode);
|
||||
ensureStableCluster(2, nonIsolatedNode);
|
||||
|
||||
for (String node : partitions.getMajoritySide()) {
|
||||
ClusterState nodeState = getNodeClusterState(node);
|
||||
boolean success = true;
|
||||
if (nodeState.nodes().getMasterNode() == null) {
|
||||
success = false;
|
||||
}
|
||||
if (!nodeState.blocks().global().isEmpty()) {
|
||||
success = false;
|
||||
}
|
||||
if (!success) {
|
||||
fail("node [" + node + "] has no master or has blocks, despite of being on the right side of the partition. State dump:\n"
|
||||
+ nodeState);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
networkDisruption.stopDisrupting();
|
||||
|
||||
// Wait until the master node sees al 3 nodes again.
|
||||
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()));
|
||||
|
||||
logger.info("Verify no master block with {} set to {}", DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all");
|
||||
client().admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(Settings.builder().put(DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all"))
|
||||
.get();
|
||||
|
||||
networkDisruption.startDisrupting();
|
||||
|
||||
|
||||
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
|
||||
// continuously ping until network failures have been resolved. However
|
||||
// It may a take a bit before the node detects it has been cut off from the elected master
|
||||
logger.info("waiting for isolated node [{}] to have no master", isolatedNode);
|
||||
assertNoMaster(isolatedNode, DiscoverySettings.NO_MASTER_BLOCK_ALL, TimeValue.timeValueSeconds(10));
|
||||
|
||||
// make sure we have stable cluster & cross partition recoveries are canceled by the removal of the missing node
|
||||
// the unresponsive partition causes recoveries to only time out after 15m (default) and these will cause
|
||||
// the test to fail due to unfreed resources
|
||||
ensureStableCluster(2, nonIsolatedNode);
|
||||
|
||||
}
|
||||
|
||||
void assertDiscoveryCompleted(List<String> nodes) throws InterruptedException {
|
||||
for (final String node : nodes) {
|
||||
assertTrue(
|
||||
"node [" + node + "] is still joining master",
|
||||
awaitBusy(
|
||||
() -> !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster(),
|
||||
30,
|
||||
TimeUnit.SECONDS
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -410,6 +410,7 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
|
|||
public TimeValue expectedTimeToHeal() {
|
||||
return TimeValue.timeValueMillis(0);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -501,4 +502,5 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
|
|||
return "network delays for [" + delay + "]";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.test.disruption;
|
|||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -44,8 +43,8 @@ public class NetworkDisruptionIT extends ESIntegTestCase {
|
|||
public void testNetworkPartitionWithNodeShutdown() throws IOException {
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
String[] nodeNames = internalCluster().getNodeNames();
|
||||
NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(nodeNames[0], nodeNames[1]),
|
||||
new NetworkUnresponsive());
|
||||
NetworkDisruption networkDisruption =
|
||||
new NetworkDisruption(new TwoPartitions(nodeNames[0], nodeNames[1]), new NetworkDisruption.NetworkUnresponsive());
|
||||
internalCluster().setDisruptionScheme(networkDisruption);
|
||||
networkDisruption.startDisrupting();
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames[0]));
|
||||
|
|
Loading…
Reference in New Issue