Tests: Faster recovery from simulated disurptions

In testing infra, one can simulate node GCs, network issues and other problems by adding a disruption to the test cluster. Those disruption are automatically removed after the test is done. At the moment each disruption indicates how long it will take the cluster to heal once the disruption is removed and the test cluster waits for this amount of time. However, more often than not this is an upper bound, causing a much longer wait than needed. Instead we should push the responsibility of healing to the disruption it self, where we can be smarter about what we wait for.

Closes #12071
This commit is contained in:
Boaz Leskes 2015-06-12 17:59:25 +02:00
parent 4f9855261a
commit 67318ce7ba
8 changed files with 64 additions and 25 deletions

View File

@ -220,7 +220,7 @@ public final class InternalTestCluster extends TestCluster {
public InternalTestCluster(long clusterSeed, Path baseDir, public InternalTestCluster(long clusterSeed, Path baseDir,
int minNumDataNodes, int maxNumDataNodes, String clusterName, SettingsSource settingsSource, int numClientNodes, int minNumDataNodes, int maxNumDataNodes, String clusterName, SettingsSource settingsSource, int numClientNodes,
boolean enableHttpPipelining, String nodePrefix) { boolean enableHttpPipelining, String nodePrefix) {
super(clusterSeed); super(clusterSeed);
this.baseDir = baseDir; this.baseDir = baseDir;
this.clusterName = clusterName; this.clusterName = clusterName;
@ -281,8 +281,8 @@ public final class InternalTestCluster extends TestCluster {
} }
builder.put("path.home", baseDir); builder.put("path.home", baseDir);
builder.put("path.repo", baseDir.resolve("repos")); builder.put("path.repo", baseDir.resolve("repos"));
builder.put("transport.tcp.port", BASE_PORT + "-" + (BASE_PORT+100)); builder.put("transport.tcp.port", BASE_PORT + "-" + (BASE_PORT + 100));
builder.put("http.port", BASE_PORT+101 + "-" + (BASE_PORT+200)); builder.put("http.port", BASE_PORT + 101 + "-" + (BASE_PORT + 200));
builder.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true); builder.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true);
builder.put("node.mode", NODE_MODE); builder.put("node.mode", NODE_MODE);
builder.put("http.pipelining", enableHttpPipelining); builder.put("http.pipelining", enableHttpPipelining);
@ -1566,20 +1566,7 @@ public final class InternalTestCluster extends TestCluster {
if (activeDisruptionScheme != null) { if (activeDisruptionScheme != null) {
TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal(); TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal();
logger.info("Clearing active scheme {}, expected healing time {}", activeDisruptionScheme, expectedHealingTime); logger.info("Clearing active scheme {}, expected healing time {}", activeDisruptionScheme, expectedHealingTime);
activeDisruptionScheme.removeFromCluster(this); activeDisruptionScheme.removeAndEnsureHealthy(this);
// We don't what scheme is picked, certain schemes don't partition the cluster, but process slow, so we need
// to to sleep, cluster health alone doesn't verify if these schemes have been cleared.
if (expectedHealingTime != null && expectedHealingTime.millis() > 0) {
try {
Thread.sleep(expectedHealingTime.millis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
assertFalse("cluster failed to form after disruption was healed", client().admin().cluster().prepareHealth()
.setWaitForNodes("" + nodes.size())
.setWaitForRelocatingShards(0)
.get().isTimedOut());
} }
activeDisruptionScheme = null; activeDisruptionScheme = null;
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask; import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.InternalTestCluster;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -89,6 +90,11 @@ public class BlockClusterStateProcessing extends SingleNodeDisruption {
} }
@Override
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
removeFromCluster(cluster);
}
@Override @Override
public TimeValue expectedTimeToHeal() { public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueMinutes(0); return TimeValue.timeValueMinutes(0);

View File

@ -21,12 +21,15 @@ package org.elasticsearch.test.disruption;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.InternalTestCluster;
import java.util.HashSet; import java.util.HashSet;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.junit.Assert.assertFalse;
/** /**
* Suspends all threads on the specified node in order to simulate a long gc. * Suspends all threads on the specified node in order to simulate a long gc.
*/ */
@ -63,6 +66,12 @@ public class LongGCDisruption extends SingleNodeDisruption {
} }
} }
@Override
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
removeFromCluster(cluster);
ensureNodeCount(cluster);
}
@Override @Override
public TimeValue expectedTimeToHeal() { public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueMillis(0); return TimeValue.timeValueMillis(0);

View File

@ -32,6 +32,8 @@ import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import static org.junit.Assert.assertFalse;
public abstract class NetworkPartition implements ServiceDisruptionScheme { public abstract class NetworkPartition implements ServiceDisruptionScheme {
protected final ESLogger logger = Loggers.getLogger(getClass()); protected final ESLogger logger = Loggers.getLogger(getClass());
@ -105,6 +107,19 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
stopDisrupting(); stopDisrupting();
} }
@Override
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
removeFromCluster(cluster);
ensureNodeCount(cluster);
}
protected void ensureNodeCount(InternalTestCluster cluster) {
assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth()
.setWaitForNodes("" + cluster.size())
.setWaitForRelocatingShards(0)
.get().isTimedOut());
}
@Override @Override
public synchronized void applyToNode(String node, InternalTestCluster cluster) { public synchronized void applyToNode(String node, InternalTestCluster cluster) {
if (!autoExpand || nodesSideOne.contains(node) || nodesSideTwo.contains(node)) { if (!autoExpand || nodesSideOne.contains(node) || nodesSideTwo.contains(node)) {

View File

@ -59,6 +59,11 @@ public class NoOpDisruptionScheme implements ServiceDisruptionScheme {
} }
@Override
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
}
@Override @Override
public TimeValue expectedTimeToHeal() { public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueSeconds(0); return TimeValue.timeValueSeconds(0);

View File

@ -23,20 +23,22 @@ import org.elasticsearch.test.InternalTestCluster;
public interface ServiceDisruptionScheme { public interface ServiceDisruptionScheme {
public void applyToCluster(InternalTestCluster cluster); void applyToCluster(InternalTestCluster cluster);
public void removeFromCluster(InternalTestCluster cluster); void removeFromCluster(InternalTestCluster cluster);
public void applyToNode(String node, InternalTestCluster cluster); void removeAndEnsureHealthy(InternalTestCluster cluster);
public void removeFromNode(String node, InternalTestCluster cluster); void applyToNode(String node, InternalTestCluster cluster);
public void startDisrupting(); void removeFromNode(String node, InternalTestCluster cluster);
public void stopDisrupting(); void startDisrupting();
public void testClusterClosed(); void stopDisrupting();
public TimeValue expectedTimeToHeal(); void testClusterClosed();
TimeValue expectedTimeToHeal();
} }

View File

@ -24,6 +24,8 @@ import org.elasticsearch.test.InternalTestCluster;
import java.util.Random; import java.util.Random;
import static org.junit.Assert.assertFalse;
public abstract class SingleNodeDisruption implements ServiceDisruptionScheme { public abstract class SingleNodeDisruption implements ServiceDisruptionScheme {
protected final ESLogger logger = Loggers.getLogger(getClass()); protected final ESLogger logger = Loggers.getLogger(getClass());
@ -80,4 +82,10 @@ public abstract class SingleNodeDisruption implements ServiceDisruptionScheme {
disruptedNode = null; disruptedNode = null;
} }
protected void ensureNodeCount(InternalTestCluster cluster) {
assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth()
.setWaitForNodes("" + cluster.size())
.setWaitForRelocatingShards(0)
.get().isTimedOut());
}
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask; import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.InternalTestCluster;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -132,6 +133,12 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
return true; return true;
} }
@Override
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
removeFromCluster(cluster);
ensureNodeCount(cluster);
}
@Override @Override
public TimeValue expectedTimeToHeal() { public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueMillis(0); return TimeValue.timeValueMillis(0);