diff --git a/core/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/core/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 8fad95257a8..3f0fb77781b 100644 --- a/core/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/core/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -118,38 +118,45 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction remoteIndices : remoteClusterIndices.entrySet()) { String clusterAlias = remoteIndices.getKey(); OriginalIndices originalIndices = remoteIndices.getValue(); - Transport.Connection connection = remoteClusterService.getConnection(remoteIndices.getKey()); - FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest(); - remoteRequest.setMergeResults(false); // we need to merge on this node - remoteRequest.indicesOptions(originalIndices.indicesOptions()); - remoteRequest.indices(originalIndices.indices()); - remoteRequest.fields(request.fields()); - transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY, - new TransportResponseHandler() { - @Override - public FieldCapabilitiesResponse newInstance() { - return new FieldCapabilitiesResponse(); - } + // if we are connected this is basically a no-op, if we are not we try to connect in parallel in a non-blocking fashion + remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> { + Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); + FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest(); + remoteRequest.setMergeResults(false); // we need to merge on this node + remoteRequest.indicesOptions(originalIndices.indicesOptions()); + remoteRequest.indices(originalIndices.indices()); + remoteRequest.fields(request.fields()); + transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY, + new TransportResponseHandler() { - @Override - public void handleResponse(FieldCapabilitiesResponse response) { - for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) { - indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.buildRemoteIndexName(clusterAlias, - res.getIndexName()), res.get())); - } - onResponse.run(); - } + @Override + public FieldCapabilitiesResponse newInstance() { + return new FieldCapabilitiesResponse(); + } - @Override - public void handleException(TransportException exp) { - onResponse.run(); - } + @Override + public void handleResponse(FieldCapabilitiesResponse response) { + try { + for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) { + indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware. + buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get())); + } + } finally { + onResponse.run(); + } + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); + @Override + public void handleException(TransportException exp) { + onResponse.run(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + }, e -> onResponse.run())); } } diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index e4f0d0d4af7..2b16c26931b 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -160,12 +160,24 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo // we can't proceed with a search on a cluster level. // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller // end since they provide the listener. - connectHandler.connect(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, listener), listener::onFailure)); + ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, listener), listener::onFailure)); } else { fetchShardsInternal(searchRequest, listener); } } + /** + * Ensures that this cluster is connected. If the cluster is connected this operation + * will invoke the listener immediately. + */ + public void ensureConnected(ActionListener voidActionListener) { + if (connectedNodes.isEmpty()) { + connectHandler.connect(voidActionListener); + } else { + voidActionListener.onResponse(null); + } + } + private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, final ActionListener listener) { final DiscoveryNode node = nodeSupplier.get(); diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 91a5cebbcd2..621713c8ab1 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; @@ -46,6 +47,7 @@ import org.elasticsearch.search.internal.AliasFilter; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -265,6 +267,18 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl return connection.getConnection(node); } + /** + * Ensures that the given cluster alias is connected. If the cluster is connected this operation + * will invoke the listener immediately. + */ + public void ensureConnected(String clusterAlias, ActionListener listener) { + RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); + if (remoteClusterConnection == null) { + throw new IllegalArgumentException("no such remote cluster: " + clusterAlias); + } + remoteClusterConnection.ensureConnected(listener); + } + public Transport.Connection getConnection(String cluster) { RemoteClusterConnection connection = remoteClusters.get(cluster); if (connection == null) { diff --git a/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 38e73c209ae..3c1181b6825 100644 --- a/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; @@ -730,4 +731,58 @@ public class RemoteClusterConnectionTests extends ESTestCase { } return statsRef.get(); } + + public void testEnsureConnected() throws IOException, InterruptedException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + assertFalse(service.nodeConnected(seedNode)); + assertFalse(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + CountDownLatch latch = new CountDownLatch(1); + connection.ensureConnected(new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(Void aVoid) { + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }, latch)); + latch.await(); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + + // exec again we are already connected + connection.ensureConnected(new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(Void aVoid) { + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }, latch)); + latch.await(); + assertTrue(service.nodeConnected(seedNode)); + assertTrue(service.nodeConnected(discoverableNode)); + assertTrue(connection.assertNoRunningConnections()); + } + } + } + } } diff --git a/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 32a672e1bbc..9b2e5fc4e53 100644 --- a/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -189,6 +189,44 @@ public class RemoteClusterServiceTests extends ESTestCase { } } + public void testEnsureConnected() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(otherSeedTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + builder.putArray("search.remote.cluster_1.seeds", seedNode.getAddress().toString()); + builder.putArray("search.remote.cluster_2.seeds", otherSeedNode.getAddress().toString()); + try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertFalse(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().address())); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + service.updateRemoteCluster("cluster_2", Collections.singletonList(otherSeedNode.getAddress().address())); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(service.isRemoteClusterRegistered("cluster_2")); + service.updateRemoteCluster("cluster_2", Collections.emptyList()); + assertFalse(service.isRemoteClusterRegistered("cluster_2")); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, + () -> service.updateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Collections.emptyList())); + assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); + } + } + } + } + public void testRemoteNodeAttribute() throws IOException, InterruptedException { final Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();