[TEST] Wait for tribe node to be fully connected before shutting it down

The tribe was being shutdown by the test while a publishing round (that adds the tribe node to a cluster) is not completed yet (i.e. the node itself
knows that it became part of the cluster, and the test shuts the tribe node down, but another node has not applied the cluster state yet, which makes
that node hang while trying to connect to the node that is shutting down (due to connect_timeout being 30 seconds), delaying publishing for 30
seconds, and subsequently tripping an assertion when another tribe instance wants to join.

Relates to #23695
This commit is contained in:
Yannick Welsch 2017-04-24 12:24:51 +02:00
parent 6d6a230f70
commit 7c395070e2
1 changed files with 21 additions and 40 deletions

View File

@ -211,6 +211,27 @@ public class TribeIT extends ESIntegTestCase {
private Releasable startTribeNode(Predicate<InternalTestCluster> filter, Settings settings) throws Exception {
final String node = internalCluster().startNode(createTribeSettings(filter).put(settings).build());
// wait for node to be connected to all tribe clusters
final Set<String> expectedNodes = Sets.newHashSet(internalCluster().getNodeNames());
doWithAllClusters(filter, c -> {
// Adds the tribe client node dedicated to this remote cluster
for (String tribeNode : internalCluster().getNodeNames()) {
expectedNodes.add(tribeNode + "/" + c.getClusterName());
}
// Adds the remote clusters nodes names
Collections.addAll(expectedNodes, c.getNodeNames());
});
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState();
Set<String> nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet());
assertThat(nodes, containsInAnyOrder(expectedNodes.toArray()));
});
// wait for join to be fully applied on all nodes in the tribe clusters, see https://github.com/elastic/elasticsearch/issues/23695
doWithAllClusters(filter, c -> {
assertFalse(c.client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get().isTimedOut());
});
return () -> {
try {
while(internalCluster().getNodeNames().length > 0) {
@ -256,9 +277,6 @@ public class TribeIT extends ESIntegTestCase {
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
ensureGreen(cluster2.client());
// Wait for the tribe node to connect to the two remote clusters
assertNodes(ALL);
// Wait for the tribe node to retrieve the indices into its cluster state
assertIndicesExist(client(), "test1", "test2");
@ -294,9 +312,6 @@ public class TribeIT extends ESIntegTestCase {
assertAcked(cluster2.client().admin().indices().prepareCreate("block_test2"));
ensureGreen(cluster2.client());
// Wait for the tribe node to connect to the two remote clusters
assertNodes(ALL);
// Wait for the tribe node to retrieve the indices into its cluster state
assertIndicesExist(client(), "test1", "test2", "block_test1", "block_test2");
@ -328,9 +343,6 @@ public class TribeIT extends ESIntegTestCase {
assertAcked(cluster2.client().admin().indices().prepareCreate("conflict"));
ensureGreen(cluster2.client());
// Wait for the tribe node to connect to the two remote clusters
assertNodes(ALL);
// Wait for the tribe node to retrieve the indices into its cluster state
assertIndicesExist(client(), "test1", "test2");
@ -358,9 +370,6 @@ public class TribeIT extends ESIntegTestCase {
assertAcked(cluster2.client().admin().indices().prepareCreate("shared"));
ensureGreen(cluster2.client());
// Wait for the tribe node to connect to the two remote clusters
assertNodes(ALL);
// Wait for the tribe node to retrieve the indices into its cluster state
assertIndicesExist(client(), "test1", "test2", "shared");
@ -383,9 +392,6 @@ public class TribeIT extends ESIntegTestCase {
assertAcked(cluster2.client().admin().indices().prepareCreate("test2"));
ensureGreen(cluster2.client());
// Wait for the tribe node to connect to the two remote clusters
assertNodes(ALL);
// Wait for the tribe node to retrieve the indices into its cluster state
assertIndicesExist(client(), "test1", "test2");
@ -444,9 +450,6 @@ public class TribeIT extends ESIntegTestCase {
assertTrue(cluster1.client().admin().indices().prepareClose("first").get().isAcknowledged());
try (Releasable tribeNode = startTribeNode()) {
// Wait for the tribe node to connect to the two remote clusters
assertNodes(ALL);
// The closed index is not part of the tribe node cluster state
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
assertFalse(clusterState.getMetaData().hasIndex("first"));
@ -481,7 +484,6 @@ public class TribeIT extends ESIntegTestCase {
for (Predicate<InternalTestCluster> predicate : predicates) {
try (Releasable tribeNode = startTribeNode(predicate, Settings.EMPTY)) {
assertNodes(predicate);
}
}
}
@ -492,7 +494,6 @@ public class TribeIT extends ESIntegTestCase {
MergableCustomMetaData1 customMetaData1 = new MergableCustomMetaData1("a");
MergableCustomMetaData1 customMetaData2 = new MergableCustomMetaData1("b");
try (Releasable tribeNode = startTribeNode()) {
assertNodes(ALL);
putCustomMetaData(cluster1, customMetaData1);
putCustomMetaData(cluster2, customMetaData2);
assertCustomMetaDataUpdated(internalCluster(), customMetaData2);
@ -510,7 +511,6 @@ public class TribeIT extends ESIntegTestCase {
Collections.sort(customMetaDatas, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData()));
final MergableCustomMetaData1 tribeNodeCustomMetaData = customMetaDatas.get(0);
try (Releasable tribeNode = startTribeNode()) {
assertNodes(ALL);
putCustomMetaData(cluster1, customMetaData1);
assertCustomMetaDataUpdated(internalCluster(), customMetaData1);
putCustomMetaData(cluster2, customMetaData2);
@ -530,7 +530,6 @@ public class TribeIT extends ESIntegTestCase {
Collections.sort(mergedCustomMetaDataType1, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData()));
Collections.sort(mergedCustomMetaDataType2, (cm1, cm2) -> cm2.getData().compareTo(cm1.getData()));
try (Releasable tribeNode = startTribeNode()) {
assertNodes(ALL);
// test putting multiple custom md types propagates to tribe
putCustomMetaData(cluster1, firstCustomMetaDataType1);
putCustomMetaData(cluster1, firstCustomMetaDataType2);
@ -631,24 +630,6 @@ public class TribeIT extends ESIntegTestCase {
});
}
private static void assertNodes(Predicate<InternalTestCluster> filter) throws Exception {
final Set<String> expectedNodes = Sets.newHashSet(internalCluster().getNodeNames());
doWithAllClusters(filter, c -> {
// Adds the tribe client node dedicated to this remote cluster
for (String tribeNode : internalCluster().getNodeNames()) {
expectedNodes.add(tribeNode + "/" + c.getClusterName());
}
// Adds the remote clusters nodes names
Collections.addAll(expectedNodes, c.getNodeNames());
});
assertBusy(() -> {
ClusterState state = client().admin().cluster().prepareState().setNodes(true).get().getState();
Set<String> nodes = StreamSupport.stream(state.getNodes().spliterator(), false).map(DiscoveryNode::getName).collect(toSet());
assertThat(nodes, containsInAnyOrder(expectedNodes.toArray()));
});
}
private static void doWithAllClusters(Consumer<InternalTestCluster> consumer) {
doWithAllClusters(cluster -> cluster != null, consumer);
}