diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 9e1d37681be..7c58622e5cc 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -211,6 +211,27 @@ public class TribeIT extends ESIntegTestCase { private Releasable startTribeNode(Predicate filter, Settings settings) throws Exception { final String node = internalCluster().startNode(createTribeSettings(filter).put(settings).build()); + + // wait for node to be connected to all tribe clusters + final Set expectedNodes = Sets.newHashSet(internalCluster().getNodeNames()); + doWithAllClusters(filter, c -> { + // Adds the tribe client node dedicated to this remote cluster + for (String tribeNode : internalCluster().getNodeNames()) { + expectedNodes.add(tribeNode + "/" + c.getClusterName()); + } + // Adds the remote clusters nodes names + Collections.addAll(expectedNodes, c.getNodeNames()); + }); + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState(); + Set nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet()); + assertThat(nodes, containsInAnyOrder(expectedNodes.toArray())); + }); + // wait for join to be fully applied on all nodes in the tribe clusters, see https://github.com/elastic/elasticsearch/issues/23695 + doWithAllClusters(filter, c -> { + assertFalse(c.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get().isTimedOut()); + }); + return () -> { try { while(internalCluster().getNodeNames().length > 0) { @@ -256,9 +277,6 @@ public class TribeIT extends ESIntegTestCase { assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); ensureGreen(cluster2.client()); - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // Wait for the tribe node to retrieve the indices into its cluster state assertIndicesExist(client(), "test1", "test2"); @@ -294,9 +312,6 @@ public class TribeIT extends ESIntegTestCase { assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2")); ensureGreen(cluster2.client()); - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // Wait for the tribe node to retrieve the indices into its cluster state assertIndicesExist(client(), "test1", "test2", "block_test1", "block_test2"); @@ -328,9 +343,6 @@ public class TribeIT extends ESIntegTestCase { assertAcked(cluster2.client().admin().indices().prepareCreate("conflict")); ensureGreen(cluster2.client()); - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // Wait for the tribe node to retrieve the indices into its cluster state assertIndicesExist(client(), "test1", "test2"); @@ -358,9 +370,6 @@ public class TribeIT extends ESIntegTestCase { assertAcked(cluster2.client().admin().indices().prepareCreate("shared")); ensureGreen(cluster2.client()); - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // Wait for the tribe node to retrieve the indices into its cluster state assertIndicesExist(client(), "test1", "test2", "shared"); @@ -383,9 +392,6 @@ public class TribeIT extends ESIntegTestCase { assertAcked(cluster2.client().admin().indices().prepareCreate("test2")); ensureGreen(cluster2.client()); - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // Wait for the tribe node to retrieve the indices into its cluster state assertIndicesExist(client(), "test1", "test2"); @@ -444,9 +450,6 @@ public class TribeIT extends ESIntegTestCase { assertTrue(cluster1.client().admin().indices().prepareClose("first").get().isAcknowledged()); try (Releasable tribeNode = startTribeNode()) { - // Wait for the tribe node to connect to the two remote clusters - assertNodes(ALL); - // The closed index is not part of the tribe node cluster state ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); assertFalse(clusterState.getMetaData().hasIndex("first")); @@ -481,7 +484,6 @@ public class TribeIT extends ESIntegTestCase { for (Predicate predicate : predicates) { try (Releasable tribeNode = startTribeNode(predicate, Settings.EMPTY)) { - assertNodes(predicate); } } } @@ -492,7 +494,6 @@ public class TribeIT extends ESIntegTestCase { MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1("a"); MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1("b"); try (Releasable tribeNode = startTribeNode()) { - assertNodes(ALL); putCustomMetaData(cluster1, customMetaData1); putCustomMetaData(cluster2, customMetaData2); assertCustomMetaDataUpdated(internalCluster(), customMetaData2); @@ -510,7 +511,6 @@ public class TribeIT extends ESIntegTestCase { Collections.sort(customMetaDatas, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); final MergableCustomMetaData1 tribeNodeCustomMetaData = customMetaDatas.get(0); try (Releasable tribeNode = startTribeNode()) { - assertNodes(ALL); putCustomMetaData(cluster1, customMetaData1); assertCustomMetaDataUpdated(internalCluster(), customMetaData1); putCustomMetaData(cluster2, customMetaData2); @@ -530,7 +530,6 @@ public class TribeIT extends ESIntegTestCase { Collections.sort(mergedCustomMetaDataType1, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); Collections.sort(mergedCustomMetaDataType2, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData())); try (Releasable tribeNode = startTribeNode()) { - assertNodes(ALL); // test putting multiple custom md types propagates to tribe putCustomMetaData(cluster1, firstCustomMetaDataType1); putCustomMetaData(cluster1, firstCustomMetaDataType2); @@ -631,24 +630,6 @@ public class TribeIT extends ESIntegTestCase { }); } - private static void assertNodes(Predicate filter) throws Exception { - final Set expectedNodes = Sets.newHashSet(internalCluster().getNodeNames()); - doWithAllClusters(filter, c -> { - // Adds the tribe client node dedicated to this remote cluster - for (String tribeNode : internalCluster().getNodeNames()) { - expectedNodes.add(tribeNode + "/" + c.getClusterName()); - } - // Adds the remote clusters nodes names - Collections.addAll(expectedNodes, c.getNodeNames()); - }); - - assertBusy(() -> { - ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState(); - Set nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet()); - assertThat(nodes, containsInAnyOrder(expectedNodes.toArray())); - }); - } - private static void doWithAllClusters(Consumer consumer) { doWithAllClusters(cluster -> cluster != null, consumer); }