From 7c395070e20807926086e2e572e78d1346c8dddb Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 24 Apr 2017 12:24:51 +0200 Subject: [PATCH] [TEST] Wait for tribe node to be fully connected before shutting it down The tribe was being shutdown by the test while a publishing round (that adds the tribe node to a cluster) is not completed yet (i.e. the node itself knows that it became part of the cluster, and the test shuts the tribe node down, but another node has not applied the cluster state yet, which makes that node hang while trying to connect to the node that is shutting down (due to connect_timeout being 30 seconds), delaying publishing for 30 seconds, and subsequently tripping an assertion when another tribe instance wants to join. Relates to #23695 --- .../java/org/elasticsearch/tribe/TribeIT.java | 61 +++++++------------ 1 file changed, 21 insertions(+), 40 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 9e1d37681be..7c58622e5cc 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -211,6 +211,27 @@ public class TribeIT extends ESIntegTestCase { private Releasable startTribeNode(Predicate filter, Settings settings) throws Exception { final String node = internalCluster().startNode(createTribeSettings(filter).put(settings).build()); + + // wait for node to be connected to all tribe clusters + final Set expectedNodes = Sets.newHashSet(internalCluster().getNodeNames()); + doWithAllClusters(filter, c -> { + // Adds the tribe client node dedicated to this remote cluster + for (String tribeNode : internalCluster().getNodeNames()) { + expectedNodes.add(tribeNode + "/" + c.getClusterName()); + } + // Adds the remote clusters nodes names + Collections.addAll(expectedNodes, c.getNodeNames()); + }); + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState(); + Set nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet()); + assertThat(nodes, containsInAnyOrder(expectedNodes.toArray())); + }); + // wait for join to be fully applied on all nodes in the tribe clusters, see https://github.com/elastic/elasticsearch/issues/23695 + doWithAllClusters(filter, c -> { + assertFalse(c.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get().isTimedOut()); + }); + return () -> { try { while(internalCluster().getNodeNames().length > 0) { @@ -256,9 +277,6 @@ public class TribeIT extends ESIntegTestCase { assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); ensureGreen(cluster2.client()); - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // Wait for the tribe node to retrieve the indices into its cluster state assertIndicesExist(client(), "test1", "test2"); @@ -294,9 +312,6 @@ public class TribeIT extends ESIntegTestCase { assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2")); ensureGreen(cluster2.client()); - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // Wait for the tribe node to retrieve the indices into its cluster state assertIndicesExist(client(), "test1", "test2", "block_test1", "block_test2"); @@ -328,9 +343,6 @@ public class TribeIT extends ESIntegTestCase { assertAcked(cluster2.client().admin().indices().prepareCreate("conflict")); ensureGreen(cluster2.client()); - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // Wait for the tribe node to retrieve the indices into its cluster state assertIndicesExist(client(), "test1", "test2"); @@ -358,9 +370,6 @@ public class TribeIT extends ESIntegTestCase { assertAcked(cluster2.client().admin().indices().prepareCreate("shared")); ensureGreen(cluster2.client()); - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // Wait for the tribe node to retrieve the indices into its cluster state assertIndicesExist(client(), "test1", "test2", "shared"); @@ -383,9 +392,6 @@ public class TribeIT extends ESIntegTestCase { assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); ensureGreen(cluster2.client()); - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // Wait for the tribe node to retrieve the indices into its cluster state assertIndicesExist(client(), "test1", "test2"); @@ -444,9 +450,6 @@ public class TribeIT extends ESIntegTestCase { assertTrue(cluster1.client().admin().indices().prepareClose("first").get().isAcknowledged()); try (Releasable tribeNode = startTribeNode()) { - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // The closed index is not part of the tribe node cluster state ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); assertFalse(clusterState.getMetaData().hasIndex("first")); @@ -481,7 +484,6 @@ public class TribeIT extends ESIntegTestCase { for (Predicate predicate : predicates) { try (Releasable tribeNode = startTribeNode(predicate, Settings.EMPTY)) { - assertNodes(predicate); } } } @@ -492,7 +494,6 @@ public class TribeIT extends ESIntegTestCase { MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1("a"); MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1("b"); try (Releasable tribeNode = startTribeNode()) { - assertNodes(ALL); putCustomMetaData(cluster1, customMetaData1); putCustomMetaData(cluster2, customMetaData2); assertCustomMetaDataUpdated(internalCluster(), customMetaData2); @@ -510,7 +511,6 @@ public class TribeIT extends ESIntegTestCase { Collections.sort(customMetaDatas, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); final MergableCustomMetaData1 tribeNodeCustomMetaData = customMetaDatas.get(0); try (Releasable tribeNode = startTribeNode()) { - assertNodes(ALL); putCustomMetaData(cluster1, customMetaData1); assertCustomMetaDataUpdated(internalCluster(), customMetaData1); putCustomMetaData(cluster2, customMetaData2); @@ -530,7 +530,6 @@ public class TribeIT extends ESIntegTestCase { Collections.sort(mergedCustomMetaDataType1, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); Collections.sort(mergedCustomMetaDataType2, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); try (Releasable tribeNode = startTribeNode()) { - assertNodes(ALL); // test putting multiple custom md types propagates to tribe putCustomMetaData(cluster1, firstCustomMetaDataType1); putCustomMetaData(cluster1, firstCustomMetaDataType2); @@ -631,24 +630,6 @@ public class TribeIT extends ESIntegTestCase { }); } - private static void assertNodes(Predicate filter) throws Exception { - final Set expectedNodes = Sets.newHashSet(internalCluster().getNodeNames()); - doWithAllClusters(filter, c -> { - // Adds the tribe client node dedicated to this remote cluster - for (String tribeNode : internalCluster().getNodeNames()) { - expectedNodes.add(tribeNode + "/" + c.getClusterName()); - } - // Adds the remote clusters nodes names - Collections.addAll(expectedNodes, c.getNodeNames()); - }); - - assertBusy(() -> { - ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState(); - Set nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet()); - assertThat(nodes, containsInAnyOrder(expectedNodes.toArray())); - }); - } - private static void doWithAllClusters(Consumer consumer) { doWithAllClusters(cluster -> cluster != null, consumer); }