diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java index 102e16691d9..33f683b458c 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java @@ -45,7 +45,7 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase { @TestLogging("_root:DEBUG") public void testDelayShards() throws Exception { logger.info("--> starting 3 nodes"); - internalCluster().startNodesAsync(3).get(); + internalCluster().startNodes(3); // Wait for all 3 nodes to be up logger.info("--> waiting for 3 nodes to be up"); diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java index aa73eafb496..26a882f0045 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsIT.java @@ -162,7 +162,7 @@ public class ClusterStatsIT extends ESIntegTestCase { } public void testValuesSmokeScreen() throws IOException, ExecutionException, InterruptedException { - internalCluster().startNodesAsync(randomIntBetween(1, 3)).get(); + internalCluster().startNodes(randomIntBetween(1, 3)); index("test1", "type", "1", "f", "f"); ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); @@ -202,7 +202,7 @@ public class ClusterStatsIT extends ESIntegTestCase { public void testAllocatedProcessors() throws Exception { // start one node with 7 processors. - internalCluster().startNodesAsync(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build()).get(); + internalCluster().startNode(Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 7).build()); waitForNodes(1); ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get(); diff --git a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index 7c764ed1724..b35aac5f958 100644 --- a/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -75,7 +75,7 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase { .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2) .build(); - internalCluster().startMasterOnlyNodesAsync(3, sharedSettings).get(); + internalCluster().startMasterOnlyNodes(3, sharedSettings); String dataNode = internalCluster().startDataOnlyNode(sharedSettings); diff --git a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java index 1fd71c7ae51..307b47edacf 100644 --- a/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java +++ b/core/src/test/java/org/elasticsearch/bwcompat/OldIndexBackwardsCompatibilityIT.java @@ -61,7 +61,6 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; -import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.OldIndexUtils; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; @@ -129,24 +128,23 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase { } void setupCluster() throws Exception { - InternalTestCluster.Async> replicas = internalCluster().startNodesAsync(1); // for replicas + List replicas = internalCluster().startNodes(1); // for replicas Path baseTempDir = createTempDir(); // start single data path node Settings.Builder nodeSettings = Settings.builder() .put(Environment.PATH_DATA_SETTING.getKey(), baseTempDir.resolve("single-path").toAbsolutePath()) .put(Node.NODE_MASTER_SETTING.getKey(), false); // workaround for dangling index loading issue when node is master - InternalTestCluster.Async singleDataPathNode = internalCluster().startNodeAsync(nodeSettings.build()); + singleDataPathNodeName = internalCluster().startNode(nodeSettings); // start multi data path node nodeSettings = Settings.builder() .put(Environment.PATH_DATA_SETTING.getKey(), baseTempDir.resolve("multi-path1").toAbsolutePath() + "," + baseTempDir .resolve("multi-path2").toAbsolutePath()) .put(Node.NODE_MASTER_SETTING.getKey(), false); // workaround for dangling index loading issue when node is master - InternalTestCluster.Async multiDataPathNode = internalCluster().startNodeAsync(nodeSettings.build()); + multiDataPathNodeName = internalCluster().startNode(nodeSettings); // find single data path dir - singleDataPathNodeName = singleDataPathNode.get(); Path[] nodePaths = internalCluster().getInstance(NodeEnvironment.class, singleDataPathNodeName).nodeDataPaths(); assertEquals(1, nodePaths.length); singleDataPath = nodePaths[0].resolve(NodeEnvironment.INDICES_FOLDER); @@ -155,7 +153,6 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase { logger.info("--> Single data path: {}", singleDataPath); // find multi data path dirs - multiDataPathNodeName = multiDataPathNode.get(); nodePaths = internalCluster().getInstance(NodeEnvironment.class, multiDataPathNodeName).nodeDataPaths(); assertEquals(2, nodePaths.length); multiDataPath = new Path[]{nodePaths[0].resolve(NodeEnvironment.INDICES_FOLDER), @@ -165,8 +162,6 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase { Files.createDirectories(multiDataPath[0]); Files.createDirectories(multiDataPath[1]); logger.info("--> Multi data paths: {}, {}", multiDataPath[0], multiDataPath[1]); - - replicas.get(); // wait for replicas } void upgradeIndexFolder() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index d7a62ca7f10..3d3e87db083 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -126,7 +126,7 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { } public void testClusterInfoServiceCollectsInformation() throws Exception { - internalCluster().startNodesAsync(2).get(); + internalCluster().startNodes(2); assertAcked(prepareCreate("test").setSettings(Settings.builder() .put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), 0) .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE).build())); @@ -174,10 +174,9 @@ public class ClusterInfoServiceIT extends ESIntegTestCase { } public void testClusterInfoServiceInformationClearOnError() throws InterruptedException, ExecutionException { - internalCluster().startNodesAsync(2, + internalCluster().startNodes(2, // manually control publishing - Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build()) - .get(); + Settings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m").build()); prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get(); ensureGreen("test"); InternalTestCluster internalTestCluster = internalCluster(); diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 257b80663a3..8f76581ae3f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -24,7 +24,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; @@ -202,22 +201,19 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { .build(); logger.info("--> start first 2 nodes"); - internalCluster().startNodesAsync(2, settings).get(); + internalCluster().startNodes(2, settings); ClusterState state; - assertBusy(new Runnable() { - @Override - public void run() { - for (Client client : clients()) { - ClusterState state = client.admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - assertThat(state.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true)); - } + assertBusy(() -> { + for (Client client : clients()) { + ClusterState state1 = client.admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + assertThat(state1.blocks().hasGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID), equalTo(true)); } }); logger.info("--> start two more nodes"); - internalCluster().startNodesAsync(2, settings).get(); + internalCluster().startNodes(2, settings); ensureGreen(); ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("4").execute().actionGet(); @@ -252,7 +248,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { assertNoMasterBlockOnAllNodes(); logger.info("--> start back the 2 nodes "); - String[] newNodes = internalCluster().startNodesAsync(2, settings).get().toArray(Strings.EMPTY_ARRAY); + String[] newNodes = internalCluster().startNodes(2, settings).stream().toArray(String[]::new); ensureGreen(); clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet(); @@ -338,7 +334,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { logger.info("--> starting [{}] nodes. min_master_nodes set to [{}]", nodeCount, initialMinMasterNodes); - internalCluster().startNodesAsync(nodeCount, settings.build()).get(); + internalCluster().startNodes(nodeCount, settings.build()); logger.info("--> waiting for nodes to join"); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(nodeCount)).get().isTimedOut()); @@ -371,7 +367,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2) .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "100ms") // speed things up .build(); - internalCluster().startNodesAsync(3, settings).get(); + internalCluster().startNodes(3, settings); ensureGreen(); // ensure cluster state is recovered before we disrupt things final String master = internalCluster().getMasterName(); diff --git a/core/src/test/java/org/elasticsearch/cluster/UpdateSettingsValidationIT.java b/core/src/test/java/org/elasticsearch/cluster/UpdateSettingsValidationIT.java index 9613128a00b..a21f61ce8af 100644 --- a/core/src/test/java/org/elasticsearch/cluster/UpdateSettingsValidationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/UpdateSettingsValidationIT.java @@ -27,17 +27,16 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; - import static org.hamcrest.Matchers.equalTo; @ClusterScope(scope= Scope.TEST, numDataNodes =0) public class UpdateSettingsValidationIT extends ESIntegTestCase { public void testUpdateSettingsValidation() throws Exception { - internalCluster().startNodesAsync( + internalCluster().startNodes( Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build(), Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build(), Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build() - ).get(); + ); createIndex("test"); NumShards test = getNumShards("test"); diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java index 19657b05480..b814716cb47 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java @@ -57,7 +57,7 @@ public class AwarenessAllocationIT extends ESIntegTestCase { logger.info("--> starting 2 nodes on the same rack"); - internalCluster().startNodesAsync(2, Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_1").build()).get(); + internalCluster().startNodes(2, Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_1").build()); createIndex("test1"); createIndex("test2"); @@ -107,12 +107,12 @@ public class AwarenessAllocationIT extends ESIntegTestCase { .build(); logger.info("--> starting 4 nodes on different zones"); - List nodes = internalCluster().startNodesAsync( + List nodes = internalCluster().startNodes( Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() - ).get(); + ); String A_0 = nodes.get(0); String B_0 = nodes.get(1); String B_1 = nodes.get(2); @@ -153,10 +153,10 @@ public class AwarenessAllocationIT extends ESIntegTestCase { .build(); logger.info("--> starting 2 nodes on zones 'a' & 'b'"); - List nodes = internalCluster().startNodesAsync( + List nodes = internalCluster().startNodes( Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() - ).get(); + ); String A_0 = nodes.get(0); String B_0 = nodes.get(1); client().admin().indices().prepareCreate("test") diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java index 9c6a4273a7f..f9f4a136e1c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java @@ -85,7 +85,7 @@ public class ClusterRerouteIT extends ESIntegTestCase { } private void rerouteWithCommands(Settings commonSettings) throws Exception { - List nodesIds = internalCluster().startNodesAsync(2, commonSettings).get(); + List nodesIds = internalCluster().startNodes(2, commonSettings); final String node_1 = nodesIds.get(0); final String node_2 = nodesIds.get(1); @@ -304,7 +304,7 @@ public class ClusterRerouteIT extends ESIntegTestCase { } public void testClusterRerouteWithBlocks() throws Exception { - List nodesIds = internalCluster().startNodesAsync(2).get(); + List nodesIds = internalCluster().startNodes(2); logger.info("--> create an index with 1 shard and 0 replicas"); assertAcked(prepareCreate("test-blocks").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))); diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java index 627fc03701c..03866269cae 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/FilteringAllocationIT.java @@ -43,7 +43,7 @@ public class FilteringAllocationIT extends ESIntegTestCase { public void testDecommissionNodeNoReplicas() throws Exception { logger.info("--> starting 2 nodes"); - List nodesIds = internalCluster().startNodesAsync(2).get(); + List nodesIds = internalCluster().startNodes(2); final String node_0 = nodesIds.get(0); final String node_1 = nodesIds.get(1); assertThat(cluster().size(), equalTo(2)); @@ -82,7 +82,7 @@ public class FilteringAllocationIT extends ESIntegTestCase { public void testDisablingAllocationFiltering() throws Exception { logger.info("--> starting 2 nodes"); - List nodesIds = internalCluster().startNodesAsync(2).get(); + List nodesIds = internalCluster().startNodes(2); final String node_0 = nodesIds.get(0); final String node_1 = nodesIds.get(1); assertThat(cluster().size(), equalTo(2)); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java index e4321218983..853f0f65612 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.junit.annotations.TestLogging; import java.util.Collections; import java.util.List; @@ -42,7 +41,7 @@ public class DelayedAllocationIT extends ESIntegTestCase { * get allocated to a free node when the node hosting it leaves the cluster. */ public void testNoDelayedTimeout() throws Exception { - internalCluster().startNodesAsync(3).get(); + internalCluster().startNodes(3); prepareCreate("test").setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) @@ -61,7 +60,7 @@ public class DelayedAllocationIT extends ESIntegTestCase { * on it before. */ public void testDelayedAllocationNodeLeavesAndComesBack() throws Exception { - internalCluster().startNodesAsync(3).get(); + internalCluster().startNodes(3); prepareCreate("test").setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) @@ -85,7 +84,7 @@ public class DelayedAllocationIT extends ESIntegTestCase { * though the node hosting the shard is not coming back. */ public void testDelayedAllocationTimesOut() throws Exception { - internalCluster().startNodesAsync(3).get(); + internalCluster().startNodes(3); prepareCreate("test").setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) @@ -107,7 +106,7 @@ public class DelayedAllocationIT extends ESIntegTestCase { * even though the node it was hosted on will not come back. */ public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception { - internalCluster().startNodesAsync(3).get(); + internalCluster().startNodes(3); prepareCreate("test").setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) @@ -133,7 +132,7 @@ public class DelayedAllocationIT extends ESIntegTestCase { * even though the node it was hosted on will not come back. */ public void testDelayedAllocationChangeWithSettingTo0() throws Exception { - internalCluster().startNodesAsync(3).get(); + internalCluster().startNodes(3); prepareCreate("test").setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 93326e54db9..86dd2dfe189 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -71,7 +71,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase { private void createStaleReplicaScenario() throws Exception { logger.info("--> starting 3 nodes, 1 master, 2 data"); String master = internalCluster().startMasterOnlyNode(Settings.EMPTY); - internalCluster().startDataOnlyNodesAsync(2).get(); + internalCluster().startDataOnlyNodes(2); assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder() .put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get()); @@ -267,7 +267,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase { public void testNotWaitForQuorumCopies() throws Exception { logger.info("--> starting 3 nodes"); - internalCluster().startNodesAsync(3).get(); + internalCluster().startNodes(3); logger.info("--> creating index with 1 primary and 2 replicas"); assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder() .put("index.number_of_shards", randomIntBetween(1, 3)).put("index.number_of_replicas", 2)).get()); @@ -289,7 +289,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase { */ public void testForceAllocatePrimaryOnNoDecision() throws Exception { logger.info("--> starting 1 node"); - final String node = internalCluster().startNodeAsync().get(); + final String node = internalCluster().startNode(); logger.info("--> creating index with 1 primary and 0 replicas"); final String indexName = "test-idx"; assertAcked(client().admin().indices() diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 68e8fc9c94e..51ddc0f3fd9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -54,7 +54,7 @@ public class MockDiskUsagesIT extends ESIntegTestCase { } public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { - List nodes = internalCluster().startNodesAsync(3).get(); + List nodes = internalCluster().startNodes(3); // Wait for all 3 nodes to be up assertBusy(new Runnable() { diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index c5fa05d6635..99a7f04e74d 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -43,8 +43,8 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.cluster.service.ClusterStateStatus; import org.elasticsearch.cluster.service.ClusterServiceState; +import org.elasticsearch.cluster.service.ClusterStateStatus; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -187,13 +187,13 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } private List startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws - ExecutionException, InterruptedException { + ExecutionException, InterruptedException { configureCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode); - List nodes = internalCluster().startNodesAsync(numberOfNodes).get(); + List nodes = internalCluster().startNodes(numberOfNodes); ensureStableCluster(numberOfNodes); // TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results - ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing(); + ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing(); if (zenPing instanceof UnicastZenPing) { ((UnicastZenPing) zenPing).clearTemporalResponses(); } @@ -201,16 +201,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } static final Settings DEFAULT_SETTINGS = Settings.builder() - .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly - .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly - .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly - .put(TcpTransport.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this - // value and the time of disruption and does not recover immediately - // when disruption is stop. We should make sure we recover faster - // then the default of 30s, causing ensureGreen and friends to time out + .put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly + .put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly + .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out + .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly + .put(TcpTransport.TCP_CONNECT_TIMEOUT.getKey(), "10s") // Network delay disruption waits for the min between this + // value and the time of disruption and does not recover immediately + // when disruption is stop. We should make sure we recover faster + // then the default of 30s, causing ensureGreen and friends to time out - .build(); + .build(); @Override protected Collection> nodePlugins() { @@ -237,10 +237,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { logger.info("---> configured unicast"); // TODO: Rarely use default settings form some of these Settings nodeSettings = Settings.builder() - .put(settings) - .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes) - .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode) - .build(); + .put(settings) + .put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), numberOfNodes) + .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNode) + .build(); if (discoveryConfig == null) { if (unicastHostsOrdinals == null) { @@ -306,8 +306,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { logger.info("--> reducing min master nodes to 2"); assertAcked(client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)) - .get()); + .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)) + .get()); String master = internalCluster().getMasterName(); String nonMaster = null; @@ -334,8 +334,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { // Makes sure that the get request can be executed on each node locally: assertAcked(prepareCreate("test").setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) )); // Everything is stable now, it is now time to simulate evil... @@ -376,7 +376,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } if (!success) { fail("node [" + node + "] has no master or has blocks, despite of being on the right side of the partition. State dump:\n" - + nodeState); + + nodeState); } } @@ -388,8 +388,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { logger.info("Verify no master block with {} set to {}", DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all"); client().admin().cluster().prepareUpdateSettings() - .setTransientSettings(Settings.builder().put(DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all")) - .get(); + .setTransientSettings(Settings.builder().put(DiscoverySettings.NO_MASTER_BLOCK_SETTING.getKey(), "all")) + .get(); networkDisruption.startDisrupting(); @@ -416,10 +416,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { final List nodes = startCluster(3); assertAcked(prepareCreate("test") - .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) - )); + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + )); ensureGreen(); String isolatedNode = internalCluster().getMasterName(); @@ -440,7 +440,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { for (String node : nodes) { ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkDisruption.expectedTimeToHeal().millis()), - true, node); + true, node); } logger.info("issue a reroute"); @@ -468,8 +468,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } } catch (AssertionError t) { fail("failed comparing cluster state: " + t.getMessage() + "\n" + - "--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state + - "\n--- cluster state [" + node + "]: ---\n" + nodeState); + "--- cluster state of node [" + nodes.get(0) + "]: ---\n" + state + + "\n--- cluster state [" + node + "]: ---\n" + nodeState); } } @@ -482,7 +482,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { * This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates */ @TestLogging("_root:DEBUG,org.elasticsearch.action.index:TRACE,org.elasticsearch.action.get:TRACE,discovery:TRACE,org.elasticsearch.cluster.service:TRACE," - + "org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.indices.cluster:TRACE") + + "org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.indices.cluster:TRACE") public void testAckedIndexing() throws Exception { final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5; @@ -491,10 +491,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { final List nodes = startCluster(rarely() ? 5 : 3); assertAcked(prepareCreate("test") - .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) - )); + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + )); ensureGreen(); ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme(); @@ -530,7 +530,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries); logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard); IndexResponse response = - client.prepareIndex("test", "type", id).setSource("{}").setTimeout(timeout).get(timeout); + client.prepareIndex("test", "type", id).setSource("{}").setTimeout(timeout).get(timeout); assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); ackedDocs.put(id, node); logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node); @@ -584,7 +584,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { disruptionScheme.stopDisrupting(); for (String node : internalCluster().getNodeNames()) { ensureStableCluster(nodes.size(), TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() + - DISRUPTION_HEALING_OVERHEAD.millis()), true, node); + DISRUPTION_HEALING_OVERHEAD.millis()), true, node); } ensureGreen("test"); @@ -594,7 +594,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size()); for (String id : ackedDocs.keySet()) { assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found", - client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists()); + client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists()); } } catch (AssertionError e) { throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e); @@ -684,7 +684,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { // Keeps track of the previous and current master when a master node transition took place on each node on the majority side: final Map>> masters = Collections.synchronizedMap(new HashMap>>()); + String>>>()); for (final String node : majoritySide) { masters.put(node, new ArrayList>()); internalCluster().getInstance(ClusterService.class, node).add(new ClusterStateListener() { @@ -694,7 +694,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { DiscoveryNode currentMaster = event.state().nodes().getMasterNode(); if (!Objects.equals(previousMaster, currentMaster)) { logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(), - event.previousState()); + event.previousState()); String previousMasterNodeName = previousMaster != null ? previousMaster.getName() : null; String currentMasterNodeName = currentMaster != null ? currentMaster.getName() : null; masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName)); @@ -739,17 +739,17 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { // The old master node will send this update + the cluster state where he is flagged as master to the other // nodes that follow the new master. These nodes should ignore this update. internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new - ClusterStateUpdateTask(Priority.IMMEDIATE) { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return ClusterState.builder(currentState).build(); - } + ClusterStateUpdateTask(Priority.IMMEDIATE) { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return ClusterState.builder(currentState).build(); + } - @Override - public void onFailure(String source, Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("failure [{}]", source), e); - } - }); + @Override + public void onFailure(String source, Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("failure [{}]", source), e); + } + }); // Save the new elected master node final String newMasterNode = internalCluster().getMasterName(majoritySide.get(0)); @@ -769,15 +769,15 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { String nodeName = entry.getKey(); List> recordedMasterTransition = entry.getValue(); assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(), - equalTo(2)); + equalTo(2)); assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(), - equalTo(oldMasterNode)); + equalTo(oldMasterNode)); assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition - .get(0).v2(), nullValue()); + .get(0).v2(), nullValue()); assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(), - nullValue()); + nullValue()); assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]", - recordedMasterTransition.get(1).v2(), equalTo(newMasterNode)); + recordedMasterTransition.get(1).v2(), equalTo(newMasterNode)); } } @@ -789,11 +789,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { List nodes = startCluster(3); assertAcked(prepareCreate("test") - .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) - ) - .get()); + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + ) + .get()); ensureGreen("test"); nodes = new ArrayList<>(nodes); @@ -809,13 +809,13 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value") - .get(); + .get(); assertThat(indexResponse.getVersion(), equalTo(1L)); logger.info("Verifying if document exists via node[{}]", notIsolatedNode); GetResponse getResponse = internalCluster().client(notIsolatedNode).prepareGet("test", "type", indexResponse.getId()) - .setPreference("_local") - .get(); + .setPreference("_local") + .get(); assertThat(getResponse.isExists(), is(true)); assertThat(getResponse.getVersion(), equalTo(1L)); assertThat(getResponse.getId(), equalTo(indexResponse.getId())); @@ -828,8 +828,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { for (String node : nodes) { logger.info("Verifying if document exists after isolating node[{}] via node[{}]", isolatedNode, node); getResponse = internalCluster().client(node).prepareGet("test", "type", indexResponse.getId()) - .setPreference("_local") - .get(); + .setPreference("_local") + .get(); assertThat(getResponse.isExists(), is(true)); assertThat(getResponse.getVersion(), equalTo(1L)); assertThat(getResponse.getId(), equalTo(indexResponse.getId())); @@ -853,7 +853,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { // Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list // includes all the other nodes that have pinged it and the issue doesn't manifest - ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing(); + ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing(); if (zenPing instanceof UnicastZenPing) { ((UnicastZenPing) zenPing).clearTemporalResponses(); } @@ -890,7 +890,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { // Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list // includes all the other nodes that have pinged it and the issue doesn't manifest - ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing(); + ZenPing zenPing = ((TestZenDiscovery) internalCluster().getInstance(Discovery.class)).getZenPing(); if (zenPing instanceof UnicastZenPing) { ((UnicastZenPing) zenPing).clearTemporalResponses(); } @@ -928,11 +928,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes(); TransportService masterTranspotService = - internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName()); + internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName()); logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode); MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, - nonMasterNode); + nonMasterNode); nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService); assertNoMaster(nonMasterNode); @@ -951,10 +951,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode); final CountDownLatch countDownLatch = new CountDownLatch(2); nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService - .original()) { + .original()) { @Override public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions - options) throws IOException, TransportException { + options) throws IOException, TransportException { if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) { countDownLatch.countDown(); } @@ -982,16 +982,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { List nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList()); String nonMasterNode = randomFrom(nonMasterNodes); assertAcked(prepareCreate("test") - .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) - )); + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + )); ensureGreen(); String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId(); // fail a random shard ShardRouting failedShard = - randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED)); + randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED)); ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode); CountDownLatch latch = new CountDownLatch(1); AtomicBoolean success = new AtomicBoolean(); @@ -1006,20 +1006,20 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { networkDisruption.startDisrupting(); service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new - ShardStateAction.Listener() { - @Override - public void onSuccess() { - success.set(true); - latch.countDown(); - } + ShardStateAction.Listener() { + @Override + public void onSuccess() { + success.set(true); + latch.countDown(); + } - @Override - public void onFailure(Exception e) { - success.set(false); - latch.countDown(); - assert false; - } - }); + @Override + public void onFailure(Exception e) { + success.set(false); + latch.countDown(); + assert false; + } + }); if (isolatedNode.equals(nonMasterNode)) { assertNoMaster(nonMasterNode); @@ -1051,11 +1051,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000); // don't wait for initial state, wat want to add the disruption while the cluster is forming.. - internalCluster().startNodesAsync(3, - Settings.builder() - .put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "1ms") - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "3s") - .build()).get(); + internalCluster().startNodes(3, Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "3s").build()); logger.info("applying disruption while cluster is forming ..."); @@ -1084,7 +1080,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode); MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, - masterNode); + masterNode); if (randomBoolean()) { masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode)); } else { @@ -1110,21 +1106,18 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { public void testSearchWithRelocationAndSlowClusterStateProcessing() throws Exception { // don't use DEFAULT settings (which can cause node disconnects on a slow CI machine) configureCluster(Settings.EMPTY, 3, null, 1); - InternalTestCluster.Async masterNodeFuture = internalCluster().startMasterOnlyNodeAsync(); - InternalTestCluster.Async node_1Future = internalCluster().startDataOnlyNodeAsync(); + final String masterNode = internalCluster().startMasterOnlyNode(); + final String node_1 = internalCluster().startDataOnlyNode(); - final String node_1 = node_1Future.get(); - final String masterNode = masterNodeFuture.get(); logger.info("--> creating index [test] with one shard and on replica"); assertAcked(prepareCreate("test").setSettings( - Settings.builder().put(indexSettings()) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) + Settings.builder().put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)) ); ensureGreen("test"); - InternalTestCluster.Async node_2Future = internalCluster().startDataOnlyNodeAsync(); - final String node_2 = node_2Future.get(); + final String node_2 = internalCluster().startDataOnlyNode(); List indexRequestBuilderList = new ArrayList<>(); for (int i = 0; i < 100; i++) { indexRequestBuilderList.add(client().prepareIndex().setIndex("test").setType("doc").setSource("{\"int_field\":1}")); @@ -1137,7 +1130,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { CountDownLatch beginRelocationLatch = new CountDownLatch(1); CountDownLatch endRelocationLatch = new CountDownLatch(1); transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch, - endRelocationLatch)); + endRelocationLatch)); internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_2)).get(); // wait for relocation to start beginRelocationLatch.await(); @@ -1176,21 +1169,19 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { */ public void testIndicesDeleted() throws Exception { final Settings settings = Settings.builder() - .put(DEFAULT_SETTINGS) - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node - .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed - .build(); + .put(DEFAULT_SETTINGS) + .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node + .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed + .build(); final String idxName = "test"; configureCluster(settings, 3, null, 2); - InternalTestCluster.Async> masterNodes = internalCluster().startMasterOnlyNodesAsync(2); - InternalTestCluster.Async dataNode = internalCluster().startDataOnlyNodeAsync(); - dataNode.get(); - final List allMasterEligibleNodes = masterNodes.get(); + final List allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(2); + final String dataNode = internalCluster().startDataOnlyNode(); ensureStableCluster(3); assertAcked(prepareCreate("test")); final String masterNode1 = internalCluster().getMasterName(); - NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(masterNode1, dataNode.get()), + NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(masterNode1, dataNode), new NetworkUnresponsive()); internalCluster().setDisruptionScheme(networkDisruption); networkDisruption.startDisrupting(); @@ -1202,7 +1193,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { for (String masterNode : allMasterEligibleNodes) { final ClusterServiceState masterState = internalCluster().clusterService(masterNode).clusterServiceState(); assertTrue("index not deleted on " + masterNode, masterState.getClusterState().metaData().hasIndex(idxName) == false && - masterState.getClusterStateStatus() == ClusterStateStatus.APPLIED); + masterState.getClusterStateStatus() == ClusterStateStatus.APPLIED); } }); internalCluster().restartNode(masterNode1, InternalTestCluster.EMPTY_CALLBACK); @@ -1212,21 +1203,21 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { public void testElectMasterWithLatestVersion() throws Exception { configureCluster(3, null, 2); - final Set nodes = new HashSet<>(internalCluster().startNodesAsync(3).get()); + final Set nodes = new HashSet<>(internalCluster().startNodes(3)); ensureStableCluster(3); ServiceDisruptionScheme isolateAllNodes = new NetworkDisruption(new NetworkDisruption.IsolateAllNodes(nodes), new NetworkDisconnect()); internalCluster().setDisruptionScheme(isolateAllNodes); logger.info("--> forcing a complete election to make sure \"preferred\" master is elected"); isolateAllNodes.startDisrupting(); - for (String node: nodes) { + for (String node : nodes) { assertNoMaster(node); } isolateAllNodes.stopDisrupting(); ensureStableCluster(3); final String preferredMasterName = internalCluster().getMasterName(); final DiscoveryNode preferredMaster = internalCluster().clusterService(preferredMasterName).localNode(); - for (String node: nodes) { + for (String node : nodes) { DiscoveryNode discoveryNode = internalCluster().clusterService(node).localNode(); assertThat(discoveryNode.getId(), greaterThanOrEqualTo(preferredMaster.getId())); } @@ -1252,7 +1243,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { logger.info("--> forcing a complete election again"); isolateAllNodes.startDisrupting(); - for (String node: nodes) { + for (String node : nodes) { assertNoMaster(node); } @@ -1298,10 +1289,17 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } final NetworkLinkDisruptionType disruptionType; switch (randomInt(2)) { - case 0: disruptionType = new NetworkUnresponsive(); break; - case 1: disruptionType = new NetworkDisconnect(); break; - case 2: disruptionType = NetworkDelay.random(random()); break; - default: throw new IllegalArgumentException(); + case 0: + disruptionType = new NetworkUnresponsive(); + break; + case 1: + disruptionType = new NetworkDisconnect(); + break; + case 2: + disruptionType = NetworkDelay.random(random()); + break; + default: + throw new IllegalArgumentException(); } final ServiceDisruptionScheme scheme; if (rarely()) { @@ -1334,7 +1332,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { if (expectedBlocks != null) { for (ClusterBlockLevel level : expectedBlocks.levels()) { assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock - (level)); + (level)); } } } @@ -1352,7 +1350,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { } logger.trace("[{}] master is [{}]", node, state.nodes().getMasterNode()); assertThat("node [" + node + "] still has [" + masterNode + "] as master", - oldMasterNode, not(equalTo(masterNode))); + oldMasterNode, not(equalTo(masterNode))); } }, 10, TimeUnit.SECONDS); } @@ -1372,12 +1370,12 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { private void assertDiscoveryCompleted(List nodes) throws InterruptedException { for (final String node : nodes) { assertTrue( - "node [" + node + "] is still joining master", - awaitBusy( - () -> !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster(), - 30, - TimeUnit.SECONDS - ) + "node [" + node + "] is still joining master", + awaitBusy( + () -> !((ZenDiscovery) internalCluster().getInstance(Discovery.class, node)).joiningCluster(), + 30, + TimeUnit.SECONDS + ) ); } } diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java deleted file mode 100644 index b708ab4c26a..00000000000 --- a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery; - -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.ESIntegTestCase.ClusterScope; -import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration; -import org.junit.Before; - -import java.util.List; -import java.util.concurrent.ExecutionException; - -import static org.hamcrest.Matchers.equalTo; - -@ClusterScope(scope = Scope.TEST, numDataNodes = 0, autoMinMasterNodes = false) -public class ZenUnicastDiscoveryIT extends ESIntegTestCase { - - private ClusterDiscoveryConfiguration discoveryConfig; - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return discoveryConfig.nodeSettings(nodeOrdinal); - } - - @Before - public void clearConfig() { - discoveryConfig = null; - } - - public void testNormalClusterForming() throws ExecutionException, InterruptedException { - int currentNumNodes = randomIntBetween(3, 5); - - // use explicit unicast hosts so we can start those first - int[] unicastHostOrdinals = new int[randomIntBetween(1, currentNumNodes)]; - for (int i = 0; i < unicastHostOrdinals.length; i++) { - unicastHostOrdinals[i] = i; - } - discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, unicastHostOrdinals); - - // start the unicast hosts - internalCluster().startNodesAsync(unicastHostOrdinals.length).get(); - - // start the rest of the cluster - internalCluster().startNodesAsync(currentNumNodes - unicastHostOrdinals.length).get(); - - if (client().admin().cluster().prepareHealth().setWaitForNodes("" + currentNumNodes).get().isTimedOut()) { - logger.info("cluster forming timed out, cluster state:\n{}", client().admin().cluster().prepareState().get().getState()); - fail("timed out waiting for cluster to form with [" + currentNumNodes + "] nodes"); - } - } - - // Without the 'include temporalResponses responses to nodesToConnect' improvement in UnicastZenPing#sendPings this - // test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=N - // can't be satisfied. - public void testMinimumMasterNodes() throws Exception { - int currentNumNodes = randomIntBetween(3, 5); - final int min_master_nodes = currentNumNodes / 2 + 1; - int currentNumOfUnicastHosts = randomIntBetween(min_master_nodes, currentNumNodes); - final Settings settings = Settings.builder() - .put("discovery.zen.join_timeout", TimeValue.timeValueSeconds(10)) - .put("discovery.zen.minimum_master_nodes", min_master_nodes) - .build(); - discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, currentNumOfUnicastHosts, settings); - - List nodes = internalCluster().startNodesAsync(currentNumNodes).get(); - - ensureStableCluster(currentNumNodes); - - DiscoveryNode masterDiscoNode = null; - for (String node : nodes) { - ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - assertThat(state.nodes().getSize(), equalTo(currentNumNodes)); - if (masterDiscoNode == null) { - masterDiscoNode = state.nodes().getMasterNode(); - } else { - assertThat(masterDiscoNode.equals(state.nodes().getMasterNode()), equalTo(true)); - } - } - } -} diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index 6856d05365a..27fb48f764c 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -80,12 +80,12 @@ public class ZenDiscoveryIT extends ESIntegTestCase { .put(Node.NODE_DATA_SETTING.getKey(), false) .put(defaultSettings) .build(); - internalCluster().startNodesAsync(2, masterNodeSettings).get(); + internalCluster().startNodes(2, masterNodeSettings); Settings dateNodeSettings = Settings.builder() .put(Node.NODE_MASTER_SETTING.getKey(), false) .put(defaultSettings) .build(); - internalCluster().startNodesAsync(2, dateNodeSettings).get(); + internalCluster().startNodes(2, dateNodeSettings); ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth() .setWaitForEvents(Priority.LANGUID) .setWaitForNodes("4") @@ -100,13 +100,10 @@ public class ZenDiscoveryIT extends ESIntegTestCase { final String oldMaster = internalCluster().getMasterName(); internalCluster().stopCurrentMasterNode(); - assertBusy(new Runnable() { - @Override - public void run() { - String current = internalCluster().getMasterName(); - assertThat(current, notNullValue()); - assertThat(current, not(equalTo(oldMaster))); - } + assertBusy(() -> { + String current = internalCluster().getMasterName(); + assertThat(current, notNullValue()); + assertThat(current, not(equalTo(oldMaster))); }); ensureSearchable("test"); @@ -130,7 +127,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { .put(Node.NODE_MASTER_SETTING.getKey(), false) .put(defaultSettings) .build(); - internalCluster().startNodesAsync(2, dateNodeSettings).get(); + internalCluster().startNodes(2, dateNodeSettings); client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class, master); @@ -155,8 +152,7 @@ public class ZenDiscoveryIT extends ESIntegTestCase { } public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception { - List nodeNames = internalCluster().startNodesAsync(2).get(); - client().admin().cluster().prepareHealth().setWaitForNodes("2").get(); + List nodeNames = internalCluster().startNodes(2); List nonMasterNodes = new ArrayList<>(nodeNames); nonMasterNodes.remove(internalCluster().getMasterName()); diff --git a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index bed21193ac6..22f06b9098d 100644 --- a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -94,7 +94,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { public void testSimpleOpenClose() throws Exception { logger.info("--> starting 2 nodes"); - internalCluster().startNodesAsync(2).get(); + internalCluster().startNodes(2); logger.info("--> creating test index"); createIndex("test"); @@ -237,7 +237,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { logger.info("--> cleaning nodes"); logger.info("--> starting 2 nodes"); - internalCluster().startNodesAsync(2).get(); + internalCluster().startNodes(2); logger.info("--> indexing a simple document"); client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefreshPolicy(IMMEDIATE).get(); @@ -277,7 +277,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { public void testDanglingIndices() throws Exception { logger.info("--> starting two nodes"); - final String node_1 = internalCluster().startNodesAsync(2).get().get(0); + final String node_1 = internalCluster().startNodes(2).get(0); logger.info("--> indexing a simple document"); client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefreshPolicy(IMMEDIATE).get(); @@ -331,7 +331,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { if (randomBoolean()) { // test with a regular index logger.info("--> starting a cluster with " + numNodes + " nodes"); - nodes = internalCluster().startNodesAsync(numNodes).get(); + nodes = internalCluster().startNodes(numNodes); logger.info("--> create an index"); createIndex(indexName); } else { @@ -344,7 +344,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase { .put(Environment.PATH_SHARED_DATA_SETTING.getKey(), dataPath.toString()) .put("index.store.fs.fs_lock", randomFrom("native", "simple")) .build(); - nodes = internalCluster().startNodesAsync(numNodes, nodeSettings).get(); + nodes = internalCluster().startNodes(numNodes, nodeSettings); logger.info("--> create a shadow replica index"); createShadowReplicaIndex(indexName, dataPath, numNodes - 1); } diff --git a/core/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java b/core/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java index 3c2917f38e8..3dfeca3053e 100644 --- a/core/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.test.InternalTestCluster.RestartCallback; import java.nio.file.Files; import java.nio.file.Path; import java.util.LinkedHashMap; +import java.util.List; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -57,10 +58,9 @@ public class MetaDataWriteDataNodesIT extends ESIntegTestCase { public void testMetaIsRemovedIfAllShardsFromIndexRemoved() throws Exception { // this test checks that the index state is removed from a data only node once all shards have been allocated away from it String masterNode = internalCluster().startMasterOnlyNode(Settings.EMPTY); - InternalTestCluster.Async nodeName1 = internalCluster().startDataOnlyNodeAsync(); - InternalTestCluster.Async nodeName2 = internalCluster().startDataOnlyNodeAsync(); - String node1 = nodeName1.get(); - String node2 = nodeName2.get(); + List nodeNames= internalCluster().startDataOnlyNodes(2); + String node1 = nodeNames.get(0); + String node2 = nodeNames.get(1); String index = "index"; assertAcked(prepareCreate(index).setSettings(Settings.builder().put("index.number_of_replicas", 0).put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node1))); diff --git a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java index 8284388d2ce..3abaff32959 100644 --- a/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java @@ -46,8 +46,7 @@ public class QuorumGatewayIT extends ESIntegTestCase { public void testQuorumRecovery() throws Exception { logger.info("--> starting 3 nodes"); // we are shutting down nodes - make sure we don't have 2 clusters if we test network - internalCluster().startNodesAsync(3).get(); - + internalCluster().startNodes(3); createIndex("test"); ensureGreen(); diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 052bfc00ef2..431b592fac9 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -316,7 +316,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { public void testLatestVersionLoaded() throws Exception { // clean two nodes - internalCluster().startNodesAsync(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()).get(); + internalCluster().startNodes(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()); client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet(); client().admin().indices().prepareFlush().execute().actionGet(); @@ -366,7 +366,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { logger.info("--> starting the two nodes back"); - internalCluster().startNodesAsync(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()).get(); + internalCluster().startNodes(2, Settings.builder().put("gateway.recover_after_nodes", 2).build()); logger.info("--> running cluster_health (wait for the shards to startup)"); ensureGreen(); @@ -392,7 +392,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4) .put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build(); - internalCluster().startNodesAsync(4, settings).get(); + internalCluster().startNodes(4, settings); // prevent any rebalance actions during the peer recovery // if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if // we reuse the files on disk after full restarts for replicas. diff --git a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java index a335a42edb6..d5a003003ac 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java +++ b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java @@ -110,7 +110,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { public void testCannotCreateWithBadPath() throws Exception { Settings nodeSettings = nodeSettings("/badpath"); - internalCluster().startNodesAsync(1, nodeSettings).get(); + internalCluster().startNodes(1, nodeSettings); Settings idxSettings = Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_DATA_PATH, "/etc/foo") @@ -132,7 +132,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { final Path dataPath = createTempDir(); Settings nodeSettings = nodeSettings(dataPath); - internalCluster().startNodesAsync(3, nodeSettings).get(); + internalCluster().startNodes(3, nodeSettings); Settings idxSettings = Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).build(); @@ -189,7 +189,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { final Path dataPath = createTempDir(); Settings nodeSettings = nodeSettings(dataPath); - internalCluster().startNodesAsync(3, nodeSettings).get(); + internalCluster().startNodes(3, nodeSettings); final String IDX = "test"; Settings idxSettings = Settings.builder() @@ -552,7 +552,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { final int nodeCount = randomIntBetween(2, 5); logger.info("--> starting {} nodes", nodeCount); - final List nodes = internalCluster().startNodesAsync(nodeCount, nodeSettings).get(); + final List nodes = internalCluster().startNodes(nodeCount, nodeSettings); final String IDX = "test"; final Tuple numPrimariesAndReplicas = randomPrimariesAndReplicas(nodeCount); final int numPrimaries = numPrimariesAndReplicas.v1(); @@ -605,7 +605,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { Path dataPath = createTempDir(); Settings nodeSettings = nodeSettings(dataPath); - final List nodes = internalCluster().startNodesAsync(2, nodeSettings).get(); + final List nodes = internalCluster().startNodes(2, nodeSettings); String IDX = "test"; Settings idxSettings = Settings.builder() @@ -661,7 +661,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { Path dataPath = createTempDir(); Settings nodeSettings = nodeSettings(dataPath); - internalCluster().startNodesAsync(3, nodeSettings).get(); + internalCluster().startNodes(3, nodeSettings); String IDX = "test"; Settings idxSettings = Settings.builder() @@ -731,10 +731,9 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { Settings fooSettings = Settings.builder().put(nodeSettings).put("node.attr.affinity", "foo").build(); Settings barSettings = Settings.builder().put(nodeSettings).put("node.attr.affinity", "bar").build(); - final InternalTestCluster.Async> fooNodes = internalCluster().startNodesAsync(2, fooSettings); - final InternalTestCluster.Async> barNodes = internalCluster().startNodesAsync(2, barSettings); - fooNodes.get(); - barNodes.get(); + List allNodes = internalCluster().startNodes(fooSettings, fooSettings, barSettings, barSettings); + List fooNodes = allNodes.subList(0, 2); + List barNodes = allNodes.subList(2, 4); String IDX = "test"; Settings includeFoo = Settings.builder() @@ -768,27 +767,27 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { client().admin().indices().prepareUpdateSettings(IDX).setSettings(includeBar).get(); // wait for the shards to move from "foo" nodes to "bar" nodes - assertNoShardsOn(fooNodes.get()); + assertNoShardsOn(fooNodes); // put shards back on "foo" client().admin().indices().prepareUpdateSettings(IDX).setSettings(includeFoo).get(); // wait for the shards to move from "bar" nodes to "foo" nodes - assertNoShardsOn(barNodes.get()); + assertNoShardsOn(barNodes); // Stop a foo node logger.info("--> stopping first 'foo' node"); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get().get(0))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get(0))); // Ensure that the other foo node has all the shards now - assertShardCountOn(fooNodes.get().get(1), 5); + assertShardCountOn(fooNodes.get(1), 5); // Assert no shards on the "bar" nodes - assertNoShardsOn(barNodes.get()); + assertNoShardsOn(barNodes); // Stop the second "foo" node logger.info("--> stopping second 'foo' node"); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get().get(1))); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(fooNodes.get(1))); // The index should still be able to be allocated (on the "bar" nodes), // all the "foo" nodes are gone @@ -799,7 +798,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { String newFooNode = internalCluster().startNode(fooSettings); assertShardCountOn(newFooNode, 5); - assertNoShardsOn(barNodes.get()); + assertNoShardsOn(barNodes); } public void testDeletingClosedIndexRemovesFiles() throws Exception { @@ -808,7 +807,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { final int numNodes = randomIntBetween(2, 5); logger.info("--> starting {} nodes", numNodes); - final List nodes = internalCluster().startNodesAsync(numNodes, nodeSettings).get(); + final List nodes = internalCluster().startNodes(numNodes, nodeSettings); final String IDX = "test"; final Tuple numPrimariesAndReplicas = randomPrimariesAndReplicas(numNodes); final int numPrimaries = numPrimariesAndReplicas.v1(); @@ -851,7 +850,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { Path dataPath = createTempDir(); Settings nodeSettings = nodeSettings(dataPath); - internalCluster().startNodesAsync(2, nodeSettings).get(); + internalCluster().startNodes(2, nodeSettings); String IDX = "test"; Settings idxSettings = Settings.builder() @@ -868,7 +867,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase { client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get(); flushAndRefresh(IDX); - internalCluster().startNodesAsync(1).get(); + internalCluster().startNodes(1); ensureYellow(IDX); final ClusterHealthResponse clusterHealth = client().admin().cluster() diff --git a/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java b/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java index 7d658a2a591..1fe6be53466 100644 --- a/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java +++ b/core/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java @@ -72,7 +72,7 @@ public class CorruptedTranslogIT extends ESIntegTestCase { } public void testCorruptTranslogFiles() throws Exception { - internalCluster().startNodesAsync(1, Settings.EMPTY).get(); + internalCluster().startNodes(1, Settings.EMPTY); assertAcked(prepareCreate("test").setSettings(Settings.builder() .put("index.number_of_shards", 1) diff --git a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java index ff4de23a069..bb8943f19d0 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java @@ -28,7 +28,6 @@ import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NativeFSLockFactory; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -47,7 +46,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MockEngineFactoryPlugin; -import org.elasticsearch.index.translog.TruncateTranslogCommand; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -85,7 +83,7 @@ public class TruncateTranslogIT extends ESIntegTestCase { } public void testCorruptTranslogTruncation() throws Exception { - internalCluster().startNodesAsync(1, Settings.EMPTY).get(); + internalCluster().startNodes(1, Settings.EMPTY); assertAcked(prepareCreate("test").setSettings(Settings.builder() .put("index.number_of_shards", 1) diff --git a/core/src/test/java/org/elasticsearch/indices/mapping/DedicatedMasterGetFieldMappingIT.java b/core/src/test/java/org/elasticsearch/indices/mapping/DedicatedMasterGetFieldMappingIT.java index 81c9d12fbb4..f8fe05bc97b 100644 --- a/core/src/test/java/org/elasticsearch/indices/mapping/DedicatedMasterGetFieldMappingIT.java +++ b/core/src/test/java/org/elasticsearch/indices/mapping/DedicatedMasterGetFieldMappingIT.java @@ -34,7 +34,7 @@ public class DedicatedMasterGetFieldMappingIT extends SimpleGetFieldMappingsIT { Settings settings = Settings.builder() .put(Node.NODE_DATA_SETTING.getKey(), false) .build(); - internalCluster().startNodesAsync(settings, Settings.EMPTY).get(); + internalCluster().startNodes(settings, Settings.EMPTY); } } diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index fe3b569755d..c38c20e0c25 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -43,7 +43,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoverySettings; -import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; @@ -174,7 +173,7 @@ public class RareClusterStateIT extends ESIntegTestCase { @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/14932") public void testDeleteCreateInOneBulk() throws Exception { - internalCluster().startNodesAsync(2).get(); + internalCluster().startNodes(2); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); prepareCreate("test").setSettings(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, true).addMapping("type").get(); ensureGreen("test"); @@ -213,7 +212,7 @@ public class RareClusterStateIT extends ESIntegTestCase { .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design .build(); - final List nodeNames = internalCluster().startNodesAsync(2, settings).get(); + final List nodeNames = internalCluster().startNodes(2, settings); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); final String master = internalCluster().getMasterName(); @@ -328,11 +327,11 @@ public class RareClusterStateIT extends ESIntegTestCase { // Here we want to test that everything goes well if the mappings that // are needed for a document are not available on the replica at the // time of indexing it - final List nodeNames = internalCluster().startNodesAsync(2, + final List nodeNames = internalCluster().startNodes(2, Settings.builder() .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design - .build()).get(); + .build()); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); final String master = internalCluster().getMasterName(); diff --git a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index a6420034c42..d8e7e7c4ac3 100644 --- a/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -292,17 +292,14 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { } public void testShardActiveElsewhereDoesNotDeleteAnother() throws Exception { - InternalTestCluster.Async masterFuture = internalCluster().startNodeAsync( - Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), true, Node.NODE_DATA_SETTING.getKey(), false).build()); - InternalTestCluster.Async> nodesFutures = internalCluster().startNodesAsync(4, - Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false, Node.NODE_DATA_SETTING.getKey(), true).build()); + final String masterNode = internalCluster().startMasterOnlyNode(); + final List nodes = internalCluster().startDataOnlyNodes(4); - final String masterNode = masterFuture.get(); - final String node1 = nodesFutures.get().get(0); - final String node2 = nodesFutures.get().get(1); - final String node3 = nodesFutures.get().get(2); + final String node1 = nodes.get(0); + final String node2 = nodes.get(1); + final String node3 = nodes.get(2); // we will use this later on, handy to start now to make sure it has a different data folder that node 1,2 &3 - final String node4 = nodesFutures.get().get(3); + final String node4 = nodes.get(3); assertAcked(prepareCreate("test").setSettings(Settings.builder() .put(indexSettings()) @@ -356,8 +353,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { logger.debug("--> starting the two old nodes back"); - internalCluster().startNodesAsync(2, - Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false, Node.NODE_DATA_SETTING.getKey(), true).build()); + internalCluster().startDataOnlyNodes(2); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("5").get().isTimedOut()); @@ -372,7 +368,7 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase { } public void testShardActiveElseWhere() throws Exception { - List nodes = internalCluster().startNodesAsync(2).get(); + List nodes = internalCluster().startNodes(2); final String masterNode = internalCluster().getMasterName(); final String nonMasterNode = nodes.get(0).equals(masterNode) ? nodes.get(1) : nodes.get(0); diff --git a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java index 2147cea696d..f1dba4e58c6 100644 --- a/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java +++ b/core/src/test/java/org/elasticsearch/nodesinfo/SimpleNodesInfoIT.java @@ -40,7 +40,7 @@ import static org.hamcrest.Matchers.notNullValue; public class SimpleNodesInfoIT extends ESIntegTestCase { public void testNodesInfos() throws Exception { - List nodesIds = internalCluster().startNodesAsync(2).get(); + List nodesIds = internalCluster().startNodes(2); final String node_1 = nodesIds.get(0); final String node_2 = nodesIds.get(1); @@ -79,7 +79,7 @@ public class SimpleNodesInfoIT extends ESIntegTestCase { } public void testNodesInfosTotalIndexingBuffer() throws Exception { - List nodesIds = internalCluster().startNodesAsync(2).get(); + List nodesIds = internalCluster().startNodes(2); final String node_1 = nodesIds.get(0); final String node_2 = nodesIds.get(1); @@ -113,11 +113,10 @@ public class SimpleNodesInfoIT extends ESIntegTestCase { } public void testAllocatedProcessors() throws Exception { - List nodesIds = internalCluster(). - startNodesAsync( + List nodesIds = internalCluster().startNodes( Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 3).build(), Settings.builder().put(EsExecutors.PROCESSORS_SETTING.getKey(), 6).build() - ).get(); + ); final String node_1 = nodesIds.get(0); final String node_2 = nodesIds.get(1); diff --git a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java index dc388677050..50035e1027b 100644 --- a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java @@ -126,7 +126,7 @@ public class FullRollingRestartIT extends ESIntegTestCase { public void testNoRebalanceOnRollingRestart() throws Exception { // see https://github.com/elastic/elasticsearch/issues/14387 internalCluster().startMasterOnlyNode(Settings.EMPTY); - internalCluster().startDataOnlyNodesAsync(3).get(); + internalCluster().startDataOnlyNodes(3); /** * We start 3 nodes and a dedicated master. Restart on of the data-nodes and ensure that we got no relocations. * Yet we have 6 shards 0 replica so that means if the restarting node comes back both other nodes are subject diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index cd93a9fef20..2017b2796b4 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -470,7 +470,7 @@ public class RelocationIT extends ESIntegTestCase { Stream.generate(() -> Settings.builder().put("node.attr.color", "blue").build()).limit(halfNodes), Stream.generate(() -> Settings.builder().put("node.attr.color", "red").build()).limit(halfNodes) ).toArray(Settings[]::new); - List nodes = internalCluster().startNodesAsync(nodeSettings).get(); + List nodes = internalCluster().startNodes(nodeSettings); String[] blueNodes = nodes.subList(0, halfNodes).stream().toArray(String[]::new); String[] redNodes = nodes.subList(halfNodes, nodes.size()).stream().toArray(String[]::new); logger.info("blue nodes: {}", (Object)blueNodes); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 2375b7519cc..b1c46b35b62 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -621,7 +621,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest public void testThatSensitiveRepositorySettingsAreNotExposed() throws Exception { Settings nodeSettings = Settings.builder().put().build(); logger.info("--> start two nodes"); - internalCluster().startNodesAsync(2, nodeSettings).get(); + internalCluster().startNodes(2, nodeSettings); // Register mock repositories client().admin().cluster().preparePutRepository("test-repo") .setType("mock").setSettings(Settings.builder() diff --git a/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureDiscoveryClusterFormationTests.java b/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureDiscoveryClusterFormationTests.java index 9e17ca21868..7f977592e8d 100644 --- a/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureDiscoveryClusterFormationTests.java +++ b/plugins/discovery-azure-classic/src/test/java/org/elasticsearch/discovery/azure/classic/AzureDiscoveryClusterFormationTests.java @@ -64,8 +64,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; - @ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0) @SuppressForbidden(reason = "use http server") // TODO this should be a IT but currently all ITs in this project run against a real cluster @@ -269,7 +267,7 @@ public class AzureDiscoveryClusterFormationTests extends ESIntegTestCase { // only wait for the cluster to form ensureClusterSizeConsistency(); // add one more node and wait for it to join - internalCluster().startDataOnlyNodeAsync().get(); + internalCluster().startDataOnlyNode(); ensureClusterSizeConsistency(); } } diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryClusterFormationTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryClusterFormationTests.java index 693e765ac2d..b4a1f55a3c6 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryClusterFormationTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryClusterFormationTests.java @@ -243,7 +243,7 @@ public class Ec2DiscoveryClusterFormationTests extends ESIntegTestCase { // only wait for the cluster to form assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get()); // add one more node and wait for it to join - internalCluster().startDataOnlyNodeAsync().get(); + internalCluster().startDataOnlyNode(); assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get()); } } diff --git a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java index 1512da2429f..76d7c6408d5 100644 --- a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java +++ b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java @@ -40,7 +40,6 @@ import org.junit.BeforeClass; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -200,7 +199,7 @@ public class GceDiscoverTests extends ESIntegTestCase { // only wait for the cluster to form assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get()); // add one more node and wait for it to join - internalCluster().startDataOnlyNodeAsync().get(); + internalCluster().startDataOnlyNode(); assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get()); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 0b121b4aa6a..2321690bee9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -130,7 +130,6 @@ import java.util.stream.Stream; import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; import static org.apache.lucene.util.LuceneTestCase.rarely; -import static org.elasticsearch.discovery.DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -696,10 +695,6 @@ public final class InternalTestCluster extends TestCluster { ensureOpen(); // currently unused Builder builder = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false) .put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_INGEST_SETTING.getKey(), false); - if (size() == 0) { - // if we are the first node - don't wait for a state - builder.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), 0); - } return startNode(builder); } @@ -791,6 +786,10 @@ public final class InternalTestCluster extends TestCluster { return nodeAndClientId; } + public String getName() { + return name; + } + public boolean isMasterEligible() { return Node.NODE_MASTER_SETTING.get(node.settings()); } @@ -887,9 +886,6 @@ public final class InternalTestCluster extends TestCluster { assert ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(newSettings.build()) == false : "min master nodes is auto managed"; newSettings.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minMasterNodes).build(); } - - // validation is (optionally) done in fullRestart/rollingRestart - newSettings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); if (clearDataIfNeeded) { clearDataIfNeeded(callback); } @@ -1018,10 +1014,6 @@ public final class InternalTestCluster extends TestCluster { final Settings.Builder settings = Settings.builder(); settings.put(Node.NODE_MASTER_SETTING.getKey(), true); settings.put(Node.NODE_DATA_SETTING.getKey(), false); - if (autoManageMinMasterNodes) { - settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end - } - NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); toStartAndPublish.add(nodeAndClient); } @@ -1032,9 +1024,6 @@ public final class InternalTestCluster extends TestCluster { settings.put(Node.NODE_MASTER_SETTING.getKey(), false).build(); settings.put(Node.NODE_DATA_SETTING.getKey(), true).build(); } - if (autoManageMinMasterNodes) { - settings.put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s"); // we wait at the end - } NodeAndClient nodeAndClient = buildNode(i, sharedNodesSeeds[i], settings.build(), true, defaultMinMasterNodes); toStartAndPublish.add(nodeAndClient); } @@ -1347,10 +1336,18 @@ public final class InternalTestCluster extends TestCluster { // special case for 1 node master - we can't update the min master nodes before we add more nodes. updateMinMasterNodes(currentMasters + newMasters); } - for (NodeAndClient nodeAndClient : nodeAndClients) { - nodeAndClient.startNode(); - publishNode(nodeAndClient); + List> futures = nodeAndClients.stream().map(node -> executor.submit(node::startNode)).collect(Collectors.toList()); + try { + for (Future future : futures) { + future.get(); + } + } catch (InterruptedException e) { + throw new AssertionError("interrupted while starting nodes", e); + } catch (ExecutionException e) { + throw new RuntimeException("failed to start nodes", e); } + nodeAndClients.forEach(this::publishNode); + if (autoManageMinMasterNodes && currentMasters == 1 && newMasters > 0) { // update once masters have joined validateClusterFormed(); @@ -1535,13 +1532,7 @@ public final class InternalTestCluster extends TestCluster { nodeAndClient.recreateNodeOnRestart(callback, false, autoManageMinMasterNodes ? getMinMasterNodes(getMasterNodesCount()) : -1); } - for (NodeAndClient nodeAndClient : startUpOrder) { - logger.info("starting node [{}] ", nodeAndClient.name); - nodeAndClient.startNode(); - if (activeDisruptionScheme != null) { - activeDisruptionScheme.applyToNode(nodeAndClient.name, this); - } - } + startAndPublishNodesAndClients(startUpOrder); if (callback.validateClusterForming()) { validateClusterFormed(); @@ -1635,6 +1626,61 @@ public final class InternalTestCluster extends TestCluster { return buildNode.name; } + /** + * Starts multiple nodes with default settings and returns their names + */ + public synchronized List startNodes(int numOfNodes) { + return startNodes(numOfNodes, Settings.EMPTY); + } + + /** + * Starts multiple nodes with the given settings and returns their names + */ + public synchronized List startNodes(int numOfNodes, Settings settings) { + return startNodes(Collections.nCopies(numOfNodes, settings).stream().toArray(Settings[]::new)); + } + + /** + * Starts multiple nodes with the given settings and returns their names + */ + public synchronized List startNodes(Settings... settings) { + final int defaultMinMasterNodes; + if (autoManageMinMasterNodes) { + int mastersDelta = (int) Stream.of(settings).filter(Node.NODE_MASTER_SETTING::get).count(); + defaultMinMasterNodes = getMinMasterNodes(getMasterNodesCount() + mastersDelta); + } else { + defaultMinMasterNodes = -1; + } + List nodes = new ArrayList<>(); + for (Settings nodeSettings: settings) { + nodes.add(buildNode(nodeSettings, defaultMinMasterNodes)); + } + startAndPublishNodesAndClients(nodes); + if (autoManageMinMasterNodes) { + validateClusterFormed(); + } + + return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList()); + } + + public synchronized List startMasterOnlyNodes(int numNodes) { + return startMasterOnlyNodes(numNodes, Settings.EMPTY); + } + + public synchronized List startMasterOnlyNodes(int numNodes, Settings settings) { + Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build(); + return startNodes(numNodes, settings1); + } + + public synchronized List startDataOnlyNodes(int numNodes) { + return startDataOnlyNodes(numNodes, Settings.EMPTY); + } + + public synchronized List startDataOnlyNodes(int numNodes, Settings settings) { + Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build(); + return startNodes(numNodes, settings1); + } + /** * updates the min master nodes setting in the current running cluster. * @@ -1667,31 +1713,8 @@ public final class InternalTestCluster extends TestCluster { return (int)nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); } - public synchronized Async> startMasterOnlyNodesAsync(int numNodes) { - return startMasterOnlyNodesAsync(numNodes, Settings.EMPTY); - } - - public synchronized Async> startMasterOnlyNodesAsync(int numNodes, Settings settings) { - Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build(); - return startNodesAsync(numNodes, settings1); - } - - public synchronized Async> startDataOnlyNodesAsync(int numNodes) { - return startDataOnlyNodesAsync(numNodes, Settings.EMPTY); - } - - public synchronized Async> startDataOnlyNodesAsync(int numNodes, Settings settings) { - Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build(); - return startNodesAsync(numNodes, settings1); - } - - public synchronized Async startMasterOnlyNodeAsync() { - return startMasterOnlyNodeAsync(Settings.EMPTY); - } - - public synchronized Async startMasterOnlyNodeAsync(Settings settings) { - Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), true).put(Node.NODE_DATA_SETTING.getKey(), false).build(); - return startNodeAsync(settings1); + public synchronized String startMasterOnlyNode() { + return startMasterOnlyNode(Settings.EMPTY); } public synchronized String startMasterOnlyNode(Settings settings) { @@ -1699,109 +1722,14 @@ public final class InternalTestCluster extends TestCluster { return startNode(settings1); } - public synchronized Async startDataOnlyNodeAsync() { - return startDataOnlyNodeAsync(Settings.EMPTY); + public synchronized String startDataOnlyNode() { + return startDataOnlyNode(Settings.EMPTY); } - - public synchronized Async startDataOnlyNodeAsync(Settings settings) { - Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build(); - return startNodeAsync(settings1); - } - public synchronized String startDataOnlyNode(Settings settings) { Settings settings1 = Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_DATA_SETTING.getKey(), true).build(); return startNode(settings1); } - /** - * Starts a node in an async manner with the given settings and returns future with its name. - */ - public synchronized Async startNodeAsync() { - return startNodeAsync(Settings.EMPTY); - } - - /** - * Starts a node in an async manner with the given settings and returns future with its name. - */ - public synchronized Async startNodeAsync(final Settings settings) { - final int defaultMinMasterNodes; - if (autoManageMinMasterNodes) { - int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? 1 : 0; - defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta); - } else { - defaultMinMasterNodes = -1; - } - return startNodeAsync(settings, defaultMinMasterNodes); - } - - private synchronized Async startNodeAsync(final Settings settings, int defaultMinMasterNodes) { - final NodeAndClient buildNode = buildNode(settings, defaultMinMasterNodes); - final Future submit = executor.submit(() -> { - buildNode.startNode(); - publishNode(buildNode); - return buildNode.name; - }); - return () -> submit.get(); - } - - - /** - * Starts multiple nodes in an async manner and returns future with its name. - */ - public synchronized Async> startNodesAsync(final int numNodes) { - return startNodesAsync(numNodes, Settings.EMPTY); - } - - /** - * Starts multiple nodes in an async manner with the given settings and returns future with its name. - */ - public synchronized Async> startNodesAsync(final int numNodes, final Settings settings) { - final int defaultMinMasterNodes; - if (autoManageMinMasterNodes) { - int mastersDelta = Node.NODE_MASTER_SETTING.get(settings) ? numNodes : 0; - defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta); - } else { - defaultMinMasterNodes = -1; - } - final List> asyncs = new ArrayList<>(); - for (int i = 0; i < numNodes; i++) { - asyncs.add(startNodeAsync(settings, defaultMinMasterNodes)); - } - - return () -> { - List ids = new ArrayList<>(); - for (Async async : asyncs) { - ids.add(async.get()); - } - return ids; - }; - } - - /** - * Starts multiple nodes (based on the number of settings provided) in an async manner, with explicit settings for each node. - * The order of the node names returned matches the order of the settings provided. - */ - public synchronized Async> startNodesAsync(final Settings... settings) { - final int defaultMinMasterNodes; - if (autoManageMinMasterNodes) { - int mastersDelta = (int) Stream.of(settings).filter(Node.NODE_MASTER_SETTING::get).count(); - defaultMinMasterNodes = updateMinMasterNodes(getMasterNodesCount() + mastersDelta); - } else { - defaultMinMasterNodes = -1; - } - List> asyncs = new ArrayList<>(); - for (Settings setting : settings) { - asyncs.add(startNodeAsync(setting, defaultMinMasterNodes)); - } - return () -> { - List ids = new ArrayList<>(); - for (Async async : asyncs) { - ids.add(async.get()); - } - return ids; - }; - } - private synchronized void publishNode(NodeAndClient nodeAndClient) { assert !nodeAndClient.node().isClosed(); nodes.put(nodeAndClient.name, nodeAndClient); @@ -2121,14 +2049,4 @@ public final class InternalTestCluster extends TestCluster { } } } - - /** - * Simple interface that allows to wait for an async operation to finish - * - * @param the result of the async execution - */ - public interface Async { - T get() throws ExecutionException, InterruptedException; - } - }