diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index a497e509c15..8cfec0a07f9 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collections; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport; @@ -69,7 +70,6 @@ public class RemoteClusterClientTests extends ESTestCase { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29547") public void testEnsureWeReconnect() throws Exception { Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build(); try (MockTransportService remoteTransport = startTransport("remote_node", Collections.emptyList(), Version.CURRENT, threadPool, @@ -79,17 +79,35 @@ public class RemoteClusterClientTests extends ESTestCase { .put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), true) .put("search.remote.test.seeds", remoteNode.getAddress().getAddress() + ":" + remoteNode.getAddress().getPort()).build(); try (MockTransportService service = MockTransportService.createNewService(localSettings, Version.CURRENT, threadPool, null)) { + Semaphore semaphore = new Semaphore(1); service.start(); + service.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (remoteNode.equals(node)) { + semaphore.release(); + } + } + }); + // this test is not perfect since we might reconnect concurrently but it will fail most of the time if we don't have + // the right calls in place in the RemoteAwareClient service.acceptIncomingRequests(); - service.disconnectFromNode(remoteNode); - RemoteClusterService remoteClusterService = service.getRemoteClusterService(); - assertBusy(() -> assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode))); - Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); - ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); - assertNotNull(clusterStateResponse); - assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + for (int i = 0; i < 10; i++) { + semaphore.acquire(); + try { + service.disconnectFromNode(remoteNode); + semaphore.acquire(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); + assertNotNull(clusterStateResponse); + assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value()); + assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); + } finally { + semaphore.release(); + } + } } } } - }