[TEST] Added LongGCDisruption and a test simulating GC on master nodes

Also rename DiscoveryWithNetworkFailuresTests to DiscoveryWithServiceDisruptions which better suites what we do.
This commit is contained in:
Boaz Leskes 2014-07-29 21:16:25 +02:00
parent 4b8456e954
commit 50f852ffeb
5 changed files with 293 additions and 15 deletions
pom.xml
src
main/java/org/elasticsearch/discovery/zen
test/java/org/elasticsearch

View File

@ -1219,6 +1219,11 @@
<bundledSignature>jdk-unsafe</bundledSignature>
<bundledSignature>jdk-deprecated</bundledSignature>
</bundledSignatures>
<excludes>
<!-- start exclude for test GC simulation using Thread.suspend -->
<exclude>org/elasticsearch/test/disruption/LongGCDisruption.class</exclude>
<!-- end exclude for Channels -->
</excludes>
<signaturesFiles>
<signaturesFile>test-signatures.txt</signaturesFile>
<signaturesFile>all-signatures.txt</signaturesFile>

View File

@ -313,6 +313,15 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
});
}
/**
* returns true if there is a currently a background thread active for (re)joining the cluster
* used for testing.
*/
public boolean joiningCluster() {
return currentJoinThread != null;
}
private void innerJoinCluster() {
boolean retry = true;
while (retry) {
@ -410,7 +419,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("failed to send join request to master [{}]", t);
logger.trace("failed to send join request to master [{}]", t, masterNode);
} else {
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(t));
}

View File

@ -120,7 +120,8 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
.put("discovery.zen.ping.multicast.enabled", false);
String[] unicastHosts = new String[unicastHostOrdinals.length];
if (InternalTestCluster.NODE_MODE.equals("local")) {
String mode = baseSettings.get("node.mode", InternalTestCluster.NODE_MODE);
if (mode.equals("local")) {
builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal);
for (int i = 0; i < unicastHosts.length; i++) {
unicastHosts[i] = "node_" + unicastHostOrdinals[i];

View File

@ -19,9 +19,11 @@
package org.elasticsearch.discovery;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
@ -39,6 +41,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
@ -62,14 +65,14 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
/**
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@LuceneTestCase.Slow
@TestLogging("discovery.zen:TRACE")
public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationTest {
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTest {
private static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.
@ -109,8 +112,9 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
final static Settings DEFAULT_SETTINGS = ImmutableSettings.builder()
.put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_timeout", "1s") // for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // for hitting simulated network failures quickly
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("http.enabled", false) // just to make test quicker
.put("gateway.local.list_timeout", "10s") // still long to induce failures but to long so test won't time out
@ -136,21 +140,26 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
return nodes;
}
private List<String> startUnicastCluster(int numberOfNodes,@Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
private List<String> startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
return startUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode, ImmutableSettings.EMPTY);
}
private List<String> startUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode, Settings settings) throws ExecutionException, InterruptedException {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
// TODO: Rarely use default settings form some of these
Settings settings = ImmutableSettings.builder()
Settings nodeSettings = ImmutableSettings.builder()
.put(DEFAULT_SETTINGS)
.put(settings)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, minimumMasterNode)
.build();
if (discoveryConfig == null) {
if (unicastHostsOrdinals == null) {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, settings);
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings);
} else {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, settings, unicastHostsOrdinals);
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals);
}
}
List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
@ -494,6 +503,58 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
}
/**
* Test that cluster recovers from a long GC on master that causes other nodes to elect a new one
*/
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testMasterNodeGCs() throws Exception {
// TODO: on mac OS multicast threads are shared between nodes and we therefore we can't simulate GC and stop pinging for just one node
// find a way to block thread creation in the generic thread pool to avoid this.
// TODO: with local transport the threads of the source node enter the target node, since everything is local and like above we can't simulate GC on one node
// with netty transport the threads of different nodes don't touch each other due to the network threading Netty uses
List<String> nodes = startUnicastCluster(3, null, -1, ImmutableSettings.builder().put("node.mode", "network").build());
String oldMasterNode = internalCluster().getMasterName();
// a very long GC, but it's OK as we remove the disruption when it has had an effect
SingleNodeDisruption masterNodeDisruption = new LongGCDisruption(oldMasterNode, getRandom(), 100, 200, 30000, 60000);
internalCluster().setDisruptionScheme(masterNodeDisruption);
masterNodeDisruption.startDisrupting();
Set<String> oldNonMasterNodesSet = new HashSet<>(nodes);
oldNonMasterNodesSet.remove(oldMasterNode);
List<String> oldNonMasterNodes = new ArrayList<>(oldNonMasterNodesSet);
logger.info("waiting for nodes to de-elect master [{}]", oldMasterNode);
for (String node : oldNonMasterNodesSet) {
assertDifferentMaster(node, oldMasterNode);
}
logger.info("waiting for nodes to elect a new master");
ensureStableCluster(2, oldNonMasterNodes.get(0));
logger.info("waiting for any pinging to stop");
for (final String node : oldNonMasterNodes) {
assertTrue("node [" + node + "] is still joining master", awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster();
}
}, 30, TimeUnit.SECONDS));
}
// restore GC
masterNodeDisruption.stopDisrupting();
ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + masterNodeDisruption.expectedTimeToHeal().millis()),
oldNonMasterNodes.get(0));
// make sure all nodes agree on master
String newMaster = internalCluster().getMasterName();
assertThat(newMaster, not(equalTo(oldMasterNode)));
assertMaster(newMaster, nodes);
}
/**
* Test that a document which is indexed on the majority side of a partition, is available from the minory side,
* once the partition is healed
@ -559,7 +620,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE")
public void unicastSinglePingResponseContainsMaster() throws Exception {
List<String> nodes = startUnicastCluster(4, new int[] {0}, -1);
List<String> nodes = startUnicastCluster(4, new int[]{0}, -1);
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node=" + masterNode);
@ -699,6 +760,9 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
private void ensureStableCluster(int nodeCount, TimeValue timeValue, @Nullable String viaNode) {
if (viaNode == null) {
viaNode = randomFrom(internalCluster().getNodeNames());
}
logger.debug("ensuring cluster is stable with [{}] nodes. access node: [{}]. timeout: [{}]", nodeCount, viaNode, timeValue);
ClusterHealthResponse clusterHealthResponse = client(viaNode).admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
@ -706,6 +770,11 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
.setTimeout(timeValue)
.setWaitForRelocatingShards(0)
.get();
if (clusterHealthResponse.isTimedOut()) {
ClusterStateResponse stateResponse = client(viaNode).admin().cluster().prepareState().get();
fail("failed to reach a stable cluster of [" + nodeCount + "] nodes. Tried via [" + viaNode + "]. last cluster state:\n"
+ stateResponse.getState().prettyPrint());
}
assertThat(clusterHealthResponse.isTimedOut(), is(false));
}
@ -736,11 +805,28 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}, maxWaitTime.getMillis(), TimeUnit.MILLISECONDS);
}
private void assertDifferentMaster(final String node, final String oldMasterNode) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
ClusterState state = getNodeClusterState(node);
String masterNode = null;
if (state.nodes().masterNode() != null) {
masterNode = state.nodes().masterNode().name();
}
logger.trace("[{}] master is [{}]", node, state.nodes().masterNode());
assertThat("node [" + node + "] still has [" + masterNode + "] as master",
oldMasterNode, not(equalTo(masterNode)));
}
}, 10, TimeUnit.SECONDS);
}
private void assertMaster(String masterNode, List<String> nodes) {
for (String node : nodes) {
ClusterState state = getNodeClusterState(node);
assertThat(state.nodes().size(), equalTo(nodes.size()));
assertThat(state.nodes().masterNode().name(), equalTo(masterNode));
String failMsgSuffix = "cluster_state:\n" + state.prettyPrint();
assertThat("wrong node count on [" + node + "]. " + failMsgSuffix, state.nodes().size(), equalTo(nodes.size()));
assertThat("wrong master on node [" + node + "]. " + failMsgSuffix, state.nodes().masterNode().name(), equalTo(masterNode));
}
}
}

View File

@ -0,0 +1,177 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.disruption;
import org.elasticsearch.common.unit.TimeValue;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
public class LongGCDisruption extends SingleNodeDisruption {
volatile boolean disrupting;
volatile Thread worker;
final long intervalBetweenDelaysMin;
final long intervalBetweenDelaysMax;
final long delayDurationMin;
final long delayDurationMax;
public LongGCDisruption(Random random) {
this(null, random);
}
public LongGCDisruption(String disruptedNode, Random random) {
this(disruptedNode, random, 100, 200, 300, 20000);
}
public LongGCDisruption(String disruptedNode, Random random, long intervalBetweenDelaysMin,
long intervalBetweenDelaysMax, long delayDurationMin, long delayDurationMax) {
this(random, intervalBetweenDelaysMin, intervalBetweenDelaysMax, delayDurationMin, delayDurationMax);
this.disruptedNode = disruptedNode;
}
public LongGCDisruption(Random random,
long intervalBetweenDelaysMin, long intervalBetweenDelaysMax, long delayDurationMin,
long delayDurationMax) {
super(random);
this.intervalBetweenDelaysMin = intervalBetweenDelaysMin;
this.intervalBetweenDelaysMax = intervalBetweenDelaysMax;
this.delayDurationMin = delayDurationMin;
this.delayDurationMax = delayDurationMax;
}
final static AtomicInteger thread_ids = new AtomicInteger();
@Override
public void startDisrupting() {
disrupting = true;
worker = new Thread(new BackgroundWorker(), "long_gc_simulation_" + thread_ids.incrementAndGet());
worker.setDaemon(true);
worker.start();
}
@Override
public void stopDisrupting() {
if (worker == null) {
return;
}
logger.info("stopping long GCs on [{}]", disruptedNode);
disrupting = false;
worker.interrupt();
try {
worker.join(2 * (intervalBetweenDelaysMax + delayDurationMax));
} catch (InterruptedException e) {
logger.info("background thread failed to stop");
}
worker = null;
}
final static Pattern[] unsafeClasses = new Pattern[]{
// logging has shared JVM locks - we may suspend a thread and block other nodes from doing their thing
Pattern.compile("Logger")
};
private boolean stopNodeThreads(String node, Set<Thread> nodeThreads) {
Set<Thread> allThreadsSet = Thread.getAllStackTraces().keySet();
boolean stopped = false;
final String nodeThreadNamePart = "[" + node + "]";
for (Thread thread : allThreadsSet) {
String name = thread.getName();
if (name.contains(nodeThreadNamePart)) {
if (thread.isAlive() && nodeThreads.add(thread)) {
stopped = true;
thread.suspend();
// double check the thread is not in a shared resource like logging. If so, let it go and come back..
boolean safe = true;
safe:
for (StackTraceElement stackElement : thread.getStackTrace()) {
String className = stackElement.getClassName();
for (Pattern unsafePattern : unsafeClasses) {
if (unsafePattern.matcher(className).find()) {
safe = false;
break safe;
}
}
}
if (!safe) {
thread.resume();
nodeThreads.remove(thread);
}
}
}
}
return stopped;
}
private void resumeThreads(Set<Thread> threads) {
for (Thread thread : threads) {
thread.resume();
}
}
private void simulateLongGC(final TimeValue duration) throws InterruptedException {
final String disruptionNodeCopy = disruptedNode;
if (disruptionNodeCopy == null) {
return;
}
logger.info("node [{}] goes into GC for for [{}]", disruptionNodeCopy, duration);
final Set<Thread> nodeThreads = new HashSet<>();
try {
while (stopNodeThreads(disruptionNodeCopy, nodeThreads)) ;
if (!nodeThreads.isEmpty()) {
Thread.sleep(duration.millis());
}
} finally {
logger.info("node [{}] resumes from GC", disruptionNodeCopy);
resumeThreads(nodeThreads);
}
}
@Override
public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueMillis(0);
}
class BackgroundWorker implements Runnable {
@Override
public void run() {
while (disrupting && disruptedNode != null) {
try {
TimeValue duration = new TimeValue(delayDurationMin + random.nextInt((int) (delayDurationMax - delayDurationMin)));
simulateLongGC(duration);
duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin)));
if (disrupting && disruptedNode != null) {
Thread.sleep(duration.millis());
}
} catch (InterruptedException e) {
} catch (Exception e) {
logger.error("error in background worker", e);
}
}
}
}
}