From 67318ce7ba3711907ec18de1b435b4a41e9a4edb Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 12 Jun 2015 17:59:25 +0200 Subject: [PATCH] Tests: Faster recovery from simulated disurptions In testing infra, one can simulate node GCs, network issues and other problems by adding a disruption to the test cluster. Those disruption are automatically removed after the test is done. At the moment each disruption indicates how long it will take the cluster to heal once the disruption is removed and the test cluster waits for this amount of time. However, more often than not this is an upper bound, causing a much longer wait than needed. Instead we should push the responsibility of healing to the disruption it self, where we can be smarter about what we wait for. Closes #12071 --- .../test/InternalTestCluster.java | 21 ++++--------------- .../BlockClusterStateProcessing.java | 6 ++++++ .../test/disruption/LongGCDisruption.java | 9 ++++++++ .../test/disruption/NetworkPartition.java | 15 +++++++++++++ .../test/disruption/NoOpDisruptionScheme.java | 5 +++++ .../disruption/ServiceDisruptionScheme.java | 18 +++++++++------- .../test/disruption/SingleNodeDisruption.java | 8 +++++++ .../SlowClusterStateProcessing.java | 7 +++++++ 8 files changed, 64 insertions(+), 25 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 18e5c9ce083..c1bbd9e796f 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -220,7 +220,7 @@ public final class InternalTestCluster extends TestCluster { public InternalTestCluster(long clusterSeed, Path baseDir, int minNumDataNodes, int maxNumDataNodes, String clusterName, SettingsSource settingsSource, int numClientNodes, - boolean enableHttpPipelining, String nodePrefix) { + boolean enableHttpPipelining, String nodePrefix) { super(clusterSeed); this.baseDir = baseDir; this.clusterName = clusterName; @@ -281,8 +281,8 @@ public final class InternalTestCluster extends TestCluster { } builder.put("path.home", baseDir); builder.put("path.repo", baseDir.resolve("repos")); - builder.put("transport.tcp.port", BASE_PORT + "-" + (BASE_PORT+100)); - builder.put("http.port", BASE_PORT+101 + "-" + (BASE_PORT+200)); + builder.put("transport.tcp.port", BASE_PORT + "-" + (BASE_PORT + 100)); + builder.put("http.port", BASE_PORT + 101 + "-" + (BASE_PORT + 200)); builder.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true); builder.put("node.mode", NODE_MODE); builder.put("http.pipelining", enableHttpPipelining); @@ -1566,20 +1566,7 @@ public final class InternalTestCluster extends TestCluster { if (activeDisruptionScheme != null) { TimeValue expectedHealingTime = activeDisruptionScheme.expectedTimeToHeal(); logger.info("Clearing active scheme {}, expected healing time {}", activeDisruptionScheme, expectedHealingTime); - activeDisruptionScheme.removeFromCluster(this); - // We don't what scheme is picked, certain schemes don't partition the cluster, but process slow, so we need - // to to sleep, cluster health alone doesn't verify if these schemes have been cleared. - if (expectedHealingTime != null && expectedHealingTime.millis() > 0) { - try { - Thread.sleep(expectedHealingTime.millis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - assertFalse("cluster failed to form after disruption was healed", client().admin().cluster().prepareHealth() - .setWaitForNodes("" + nodes.size()) - .setWaitForRelocatingShards(0) - .get().isTimedOut()); + activeDisruptionScheme.removeAndEnsureHealthy(this); } activeDisruptionScheme = null; } diff --git a/core/src/test/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java b/core/src/test/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java index d1972688106..c8a0820eac2 100644 --- a/core/src/test/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java +++ b/core/src/test/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask; import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.InternalTestCluster; import java.util.Random; import java.util.concurrent.CountDownLatch; @@ -89,6 +90,11 @@ public class BlockClusterStateProcessing extends SingleNodeDisruption { } + @Override + public void removeAndEnsureHealthy(InternalTestCluster cluster) { + removeFromCluster(cluster); + } + @Override public TimeValue expectedTimeToHeal() { return TimeValue.timeValueMinutes(0); diff --git a/core/src/test/java/org/elasticsearch/test/disruption/LongGCDisruption.java b/core/src/test/java/org/elasticsearch/test/disruption/LongGCDisruption.java index de4532269e4..ad84c0a98cc 100644 --- a/core/src/test/java/org/elasticsearch/test/disruption/LongGCDisruption.java +++ b/core/src/test/java/org/elasticsearch/test/disruption/LongGCDisruption.java @@ -21,12 +21,15 @@ package org.elasticsearch.test.disruption; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.InternalTestCluster; import java.util.HashSet; import java.util.Random; import java.util.Set; import java.util.regex.Pattern; +import static org.junit.Assert.assertFalse; + /** * Suspends all threads on the specified node in order to simulate a long gc. */ @@ -63,6 +66,12 @@ public class LongGCDisruption extends SingleNodeDisruption { } } + @Override + public void removeAndEnsureHealthy(InternalTestCluster cluster) { + removeFromCluster(cluster); + ensureNodeCount(cluster); + } + @Override public TimeValue expectedTimeToHeal() { return TimeValue.timeValueMillis(0); diff --git a/core/src/test/java/org/elasticsearch/test/disruption/NetworkPartition.java b/core/src/test/java/org/elasticsearch/test/disruption/NetworkPartition.java index 8c379b0a008..174e83e15a4 100644 --- a/core/src/test/java/org/elasticsearch/test/disruption/NetworkPartition.java +++ b/core/src/test/java/org/elasticsearch/test/disruption/NetworkPartition.java @@ -32,6 +32,8 @@ import java.util.List; import java.util.Random; import java.util.Set; +import static org.junit.Assert.assertFalse; + public abstract class NetworkPartition implements ServiceDisruptionScheme { protected final ESLogger logger = Loggers.getLogger(getClass()); @@ -105,6 +107,19 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme { stopDisrupting(); } + @Override + public void removeAndEnsureHealthy(InternalTestCluster cluster) { + removeFromCluster(cluster); + ensureNodeCount(cluster); + } + + protected void ensureNodeCount(InternalTestCluster cluster) { + assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth() + .setWaitForNodes("" + cluster.size()) + .setWaitForRelocatingShards(0) + .get().isTimedOut()); + } + @Override public synchronized void applyToNode(String node, InternalTestCluster cluster) { if (!autoExpand || nodesSideOne.contains(node) || nodesSideTwo.contains(node)) { diff --git a/core/src/test/java/org/elasticsearch/test/disruption/NoOpDisruptionScheme.java b/core/src/test/java/org/elasticsearch/test/disruption/NoOpDisruptionScheme.java index 7b348b1afea..06bef2105ed 100644 --- a/core/src/test/java/org/elasticsearch/test/disruption/NoOpDisruptionScheme.java +++ b/core/src/test/java/org/elasticsearch/test/disruption/NoOpDisruptionScheme.java @@ -59,6 +59,11 @@ public class NoOpDisruptionScheme implements ServiceDisruptionScheme { } + @Override + public void removeAndEnsureHealthy(InternalTestCluster cluster) { + + } + @Override public TimeValue expectedTimeToHeal() { return TimeValue.timeValueSeconds(0); diff --git a/core/src/test/java/org/elasticsearch/test/disruption/ServiceDisruptionScheme.java b/core/src/test/java/org/elasticsearch/test/disruption/ServiceDisruptionScheme.java index 70774a82356..b5f3bcacbd6 100644 --- a/core/src/test/java/org/elasticsearch/test/disruption/ServiceDisruptionScheme.java +++ b/core/src/test/java/org/elasticsearch/test/disruption/ServiceDisruptionScheme.java @@ -23,20 +23,22 @@ import org.elasticsearch.test.InternalTestCluster; public interface ServiceDisruptionScheme { - public void applyToCluster(InternalTestCluster cluster); + void applyToCluster(InternalTestCluster cluster); - public void removeFromCluster(InternalTestCluster cluster); + void removeFromCluster(InternalTestCluster cluster); - public void applyToNode(String node, InternalTestCluster cluster); + void removeAndEnsureHealthy(InternalTestCluster cluster); - public void removeFromNode(String node, InternalTestCluster cluster); + void applyToNode(String node, InternalTestCluster cluster); - public void startDisrupting(); + void removeFromNode(String node, InternalTestCluster cluster); - public void stopDisrupting(); + void startDisrupting(); - public void testClusterClosed(); + void stopDisrupting(); - public TimeValue expectedTimeToHeal(); + void testClusterClosed(); + + TimeValue expectedTimeToHeal(); } diff --git a/core/src/test/java/org/elasticsearch/test/disruption/SingleNodeDisruption.java b/core/src/test/java/org/elasticsearch/test/disruption/SingleNodeDisruption.java index 3148254011e..f74280c014a 100644 --- a/core/src/test/java/org/elasticsearch/test/disruption/SingleNodeDisruption.java +++ b/core/src/test/java/org/elasticsearch/test/disruption/SingleNodeDisruption.java @@ -24,6 +24,8 @@ import org.elasticsearch.test.InternalTestCluster; import java.util.Random; +import static org.junit.Assert.assertFalse; + public abstract class SingleNodeDisruption implements ServiceDisruptionScheme { protected final ESLogger logger = Loggers.getLogger(getClass()); @@ -80,4 +82,10 @@ public abstract class SingleNodeDisruption implements ServiceDisruptionScheme { disruptedNode = null; } + protected void ensureNodeCount(InternalTestCluster cluster) { + assertFalse("cluster failed to form after disruption was healed", cluster.client().admin().cluster().prepareHealth() + .setWaitForNodes("" + cluster.size()) + .setWaitForRelocatingShards(0) + .get().isTimedOut()); + } } diff --git a/core/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java b/core/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java index 746d7f942ba..430332acaa2 100644 --- a/core/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java +++ b/core/src/test/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask; import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.InternalTestCluster; import java.util.Random; import java.util.concurrent.CountDownLatch; @@ -132,6 +133,12 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption { return true; } + @Override + public void removeAndEnsureHealthy(InternalTestCluster cluster) { + removeFromCluster(cluster); + ensureNodeCount(cluster); + } + @Override public TimeValue expectedTimeToHeal() { return TimeValue.timeValueMillis(0);