From 12f5b02fd0364de99b5d2d035cc7ffb579b43671 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 23 Jan 2019 13:53:37 +0100 Subject: [PATCH] Streamline skip_unavailable handling (#37672) This commit moves the collectSearchShards method out of RemoteClusterService into TransportSearchAction that currently calls it. RemoteClusterService used to be used only for cross-cluster search but is now also used in cross-cluster replication where different API are called through the RemoteClusterAwareClient. There is no reason for the collectSearchShards and fetchShards methods to be respectively in RemoteClusterService and RemoteClusterConnection. The search shards API can be called through the RemoteClusterAwareClient too, the only missing bit is a way to handle failures based on the skip_unavailable setting for each cluster (currently only supported in RemoteClusterConnection#fetchShards) which is achieved by adding a isSkipUnavailable(String clusterAlias) method to RemoteClusterService. This change is useful for #32125 as we will very soon need to also call the search API against remote clusters, which will be done through RemoteClusterAwareClient. In that case we will also need to support skip_unavailable when calling the search API so we need some way to handle the skip_unavailable setting like we currently do for the search_shards call. Relates to #32125 --- .../shards/ClusterSearchShardsResponse.java | 4 - .../action/search/TransportSearchAction.java | 95 ++++++-- .../transport/RemoteClusterConnection.java | 62 +----- .../transport/RemoteClusterService.java | 65 +----- .../search/TransportSearchActionTests.java | 210 +++++++++++++++--- .../RemoteClusterAwareClientTests.java | 140 ++++++++++++ .../RemoteClusterConnectionTests.java | 203 +---------------- .../transport/RemoteClusterServiceTests.java | 209 +++-------------- 8 files changed, 445 insertions(+), 543 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java index 57407bd61fb..c8889c86c1d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java @@ -29,15 +29,11 @@ import org.elasticsearch.search.internal.AliasFilter; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.Map; public class ClusterSearchShardsResponse extends ActionResponse implements ToXContentObject { - public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], - new DiscoveryNode[0], Collections.emptyMap()); - private final ClusterSearchShardsGroup[] groups; private final DiscoveryNode[] nodes; private final Map indicesAndFilters; diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 3f03c521df5..30e030eca73 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -22,10 +22,12 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -39,6 +41,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.index.shard.ShardId; @@ -50,6 +53,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; @@ -60,8 +64,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.LongSupplier; @@ -195,17 +202,23 @@ public class TransportSearchAction extends HandledTransportAction null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY); } else { - remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), - searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> { - List remoteShardIterators = new ArrayList<>(); - Map remoteAliasFilters = new HashMap<>(); - BiFunction clusterNodeLookup = processRemoteShards(searchShardsResponses, - remoteClusterIndices, remoteShardIterators, remoteAliasFilters); - SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses); - executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, - remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, - clusters); - }, listener::onFailure)); + AtomicInteger skippedClusters = new AtomicInteger(0); + collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters, + remoteClusterIndices, remoteClusterService, threadPool, + ActionListener.wrap( + searchShardsResponses -> { + List remoteShardIterators = new ArrayList<>(); + Map remoteAliasFilters = new HashMap<>(); + BiFunction clusterNodeLookup = processRemoteShards( + searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); + int localClusters = localIndices == null ? 0 : 1; + int totalClusters = remoteClusterIndices.size() + localClusters; + int successfulClusters = searchShardsResponses.size() + localClusters; + executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, + remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, + new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get())); + }, + listener::onFailure)); } }, listener::onFailure); if (searchRequest.source() == null) { @@ -216,18 +229,56 @@ public class TransportSearchAction extends HandledTransportAction remoteIndices, - Map searchShardsResponses) { - int localClusters = localIndices == null ? 0 : 1; - int totalClusters = remoteIndices.size() + localClusters; - int successfulClusters = localClusters; - for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) { - if (searchShardsResponse != ClusterSearchShardsResponse.EMPTY) { - successfulClusters++; - } + static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters, + Map remoteIndicesByCluster, RemoteClusterService remoteClusterService, + ThreadPool threadPool, ActionListener> listener) { + final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); + final Map searchShardsResponses = new ConcurrentHashMap<>(); + final AtomicReference transportException = new AtomicReference<>(); + for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { + final String clusterAlias = entry.getKey(); + boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); + Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); + final String[] indices = entry.getValue().indices(); + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices) + .indicesOptions(indicesOptions).local(true).preference(preference).routing(routing); + clusterClient.admin().cluster().searchShards(searchShardsRequest, new ActionListener() { + @Override + public void onResponse(ClusterSearchShardsResponse response) { + searchShardsResponses.put(clusterAlias, response); + maybeFinish(); + } + + @Override + public void onFailure(Exception e) { + if (skipUnavailable) { + skippedClusters.incrementAndGet(); + } else { + RemoteTransportException exception = + new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); + if (transportException.compareAndSet(null, exception) == false) { + transportException.accumulateAndGet(exception, (previous, current) -> { + current.addSuppressed(previous); + return current; + }); + } + } + maybeFinish(); + } + + private void maybeFinish() { + if (responsesCountDown.countDown()) { + RemoteTransportException exception = transportException.get(); + if (exception == null) { + listener.onResponse(searchShardsResponses); + } else { + listener.onFailure(transportException.get()); + } + } + } + } + ); } - int skippedClusters = totalClusters - successfulClusters; - return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters); } static BiFunction processRemoteShards(Map searchShardsResponses, diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 7ea55925262..d7e3de92e40 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -25,9 +25,6 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -62,7 +59,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -172,6 +168,13 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos this.skipUnavailable = skipUnavailable; } + /** + * Returns whether this cluster is configured to be skipped when unavailable + */ + boolean isSkipUnavailable() { + return skipUnavailable; + } + @Override public void onNodeDisconnected(DiscoveryNode node) { boolean remove = connectedNodes.remove(node); @@ -181,31 +184,11 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos } } - /** - * Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end. - */ - public void fetchSearchShards(ClusterSearchShardsRequest searchRequest, - ActionListener listener) { - - final ActionListener searchShardsListener; - final Consumer onConnectFailure; - if (skipUnavailable) { - onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY); - searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY)); - } else { - onConnectFailure = listener::onFailure; - searchShardsListener = listener; - } - // in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on - // the skip_unavailable setting - ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure)); - } - /** * Ensures that this cluster is connected. If the cluster is connected this operation * will invoke the listener immediately. */ - public void ensureConnected(ActionListener voidActionListener) { + void ensureConnected(ActionListener voidActionListener) { if (connectedNodes.size() == 0) { connectHandler.connect(voidActionListener); } else { @@ -213,35 +196,6 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos } } - private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, - final ActionListener listener) { - final DiscoveryNode node = getAnyConnectedNode(); - Transport.Connection connection = connectionManager.getConnection(node); - transportService.sendRequest(connection, ClusterSearchShardsAction.NAME, searchShardsRequest, TransportRequestOptions.EMPTY, - new TransportResponseHandler() { - - @Override - public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - return new ClusterSearchShardsResponse(in); - } - - @Override - public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { - listener.onResponse(clusterSearchShardsResponse); - } - - @Override - public void handleException(TransportException e) { - listener.onFailure(e); - } - - @Override - public String executor() { - return ThreadPool.Names.SEARCH; - } - }); - } - /** * Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function * that returns null if the node ID is not found. diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index d9fcb01df4c..7d19b2eebcb 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -24,8 +24,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; @@ -50,10 +48,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -287,7 +283,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl String clusterAlias = entry.getKey(); List originalIndices = entry.getValue(); originalIndicesMap.put(clusterAlias, - new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions)); + new OriginalIndices(originalIndices.toArray(new String[0]), indicesOptions)); } } } else { @@ -311,55 +307,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl return remoteClusters.keySet(); } - public void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, - Map remoteIndicesByCluster, - ActionListener> listener) { - final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); - final Map searchShardsResponses = new ConcurrentHashMap<>(); - final AtomicReference transportException = new AtomicReference<>(); - for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { - final String clusterName = entry.getKey(); - RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName); - if (remoteClusterConnection == null) { - throw new IllegalArgumentException("no such remote cluster: " + clusterName); - } - final String[] indices = entry.getValue().indices(); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices) - .indicesOptions(indicesOptions).local(true).preference(preference) - .routing(routing); - remoteClusterConnection.fetchSearchShards(searchShardsRequest, - new ActionListener() { - @Override - public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { - searchShardsResponses.put(clusterName, clusterSearchShardsResponse); - if (responsesCountDown.countDown()) { - RemoteTransportException exception = transportException.get(); - if (exception == null) { - listener.onResponse(searchShardsResponses); - } else { - listener.onFailure(transportException.get()); - } - } - } - - @Override - public void onFailure(Exception e) { - RemoteTransportException exception = - new RemoteTransportException("error while communicating with remote cluster [" + clusterName + "]", e); - if (transportException.compareAndSet(null, exception) == false) { - exception = transportException.accumulateAndGet(exception, (previous, current) -> { - current.addSuppressed(previous); - return current; - }); - } - if (responsesCountDown.countDown()) { - listener.onFailure(exception); - } - } - }); - } - } - /** * Returns a connection to the given node on the given remote cluster * @throws IllegalArgumentException if the remote cluster is unknown @@ -376,6 +323,13 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl getRemoteClusterConnection(clusterAlias).ensureConnected(listener); } + /** + * Returns whether the cluster identified by the provided alias is configured to be skipped when unavailable + */ + public boolean isSkipUnavailable(String clusterAlias) { + return getRemoteClusterConnection(clusterAlias).isSkipUnavailable(); + } + public Transport.Connection getConnection(String cluster) { return getRemoteClusterConnection(cluster).getConnection(); } @@ -399,7 +353,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl clusterSettings.addAffixUpdateConsumer(SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {}); } - synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { + private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); if (remote != null) { remote.updateSkipUnavailable(skipUnavailable); @@ -510,5 +464,4 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl Collection getConnections() { return remoteClusters.values(); } - } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 16ff4389d7c..1b99beee65e 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; @@ -38,13 +39,19 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.NodeDisconnectedException; +import org.elasticsearch.transport.RemoteClusterConnectionTests; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteClusterServiceTests; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -53,13 +60,22 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.startsWith; public class TransportSearchActionTests extends ESTestCase { @@ -304,41 +320,169 @@ public class TransportSearchActionTests extends ESTestCase { } } - public void testBuildClusters() { - OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices(); - Map remoteIndices = new HashMap<>(); - Map searchShardsResponses = new HashMap<>(); - int numRemoteClusters = randomIntBetween(0, 10); - boolean onlySuccessful = randomBoolean(); - int localClusters = localIndices == null ? 0 : 1; - int total = numRemoteClusters + localClusters; - int successful = localClusters; - int skipped = 0; - for (int i = 0; i < numRemoteClusters; i++) { - String cluster = randomAlphaOfLengthBetween(5, 10); - remoteIndices.put(cluster, randomOriginalIndices()); - if (onlySuccessful || randomBoolean()) { - //whatever response counts as successful as long as it's not the empty placeholder - searchShardsResponses.put(cluster, new ClusterSearchShardsResponse(null, null, null)); - successful++; - } else { - searchShardsResponses.put(cluster, ClusterSearchShardsResponse.EMPTY); - skipped++; - } - } - SearchResponse.Clusters clusters = TransportSearchAction.buildClusters(localIndices, remoteIndices, searchShardsResponses); - assertEquals(total, clusters.getTotal()); - assertEquals(successful, clusters.getSuccessful()); - assertEquals(skipped, clusters.getSkipped()); + private MockTransportService startTransport(String id, List knownNodes) { + return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool); } - private static OriginalIndices randomOriginalIndices() { - int numLocalIndices = randomIntBetween(0, 5); - String[] localIndices = new String[numLocalIndices]; - for (int i = 0; i < numLocalIndices; i++) { - localIndices[i] = randomAlphaOfLengthBetween(3, 10); + public void testCollectSearchShards() throws Exception { + int numClusters = randomIntBetween(2, 10); + MockTransportService[] mockTransportServices = new MockTransportService[numClusters]; + DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; + Map remoteIndicesByCluster = new HashMap<>(); + Settings.Builder builder = Settings.builder(); + for (int i = 0; i < numClusters; i++) { + List knownNodes = new CopyOnWriteArrayList<>(); + MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes); + mockTransportServices[i] = remoteSeedTransport; + DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode(); + knownNodes.add(remoteSeedNode); + nodes[i] = remoteSeedNode; + builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); + remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); + } + Settings settings = builder.build(); + + try { + try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference> response = new AtomicReference<>(); + AtomicInteger skippedClusters = new AtomicInteger(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); + assertEquals(1, shardsResponse.getNodes().length); + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + AtomicInteger skippedClusters = new AtomicInteger(0); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); + assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); + } + + int numDisconnectedClusters = randomIntBetween(1, numClusters); + Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); + Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); + while (disconnectedNodes.size() < numDisconnectedClusters) { + int i = randomIntBetween(0, numClusters - 1); + if (disconnectedNodes.add(nodes[i])) { + assertTrue(disconnectedNodesIndices.add(i)); + } + } + + CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); + RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); + } + } + }); + for (DiscoveryNode disconnectedNode : disconnectedNodes) { + service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference failure = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); + assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + } + + //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again + for (int i : disconnectedNodesIndices) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference> response = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters - disconnectedNodesIndices.size(), map.size()); + assertEquals(skippedClusters.get(), disconnectedNodesIndices.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + if (disconnectedNodesIndices.contains(i)) { + assertFalse(map.containsKey(clusterAlias)); + } else { + assertNotNull(map.get(clusterAlias)); + } + } + } + + //give transport service enough time to realize that the node is down, and to notify the connection listeners + //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next + assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); + + service.clearAllRules(); + if (randomBoolean()) { + for (int i : disconnectedNodesIndices) { + if (randomBoolean()) { + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference> response = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + assertNotNull(map.get(clusterAlias)); + } + } + assertEquals(0, service.getConnectionManager().size()); + } + } finally { + for (MockTransportService mockTransportService : mockTransportServices) { + mockTransportService.close(); + } } - return new OriginalIndices(localIndices, IndicesOptions.fromOptions(randomBoolean(), - randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java new file mode 100644 index 00000000000..1a6eaff9e5a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class RemoteClusterAwareClientTests extends ESTestCase { + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private MockTransportService startTransport(String id, List knownNodes) { + return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool); + } + + public void testSearchShards() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes)) { + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); + try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + + try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) { + SearchRequest request = new SearchRequest("test-index"); + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") + .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) + .routing(request.routing()); + client.admin().cluster().searchShards(searchShardsRequest, + new LatchedActionListener<>(ActionListener.wrap(reference::set, e -> fail("no failures expected")), responseLatch)); + responseLatch.await(); + assertNotNull(reference.get()); + ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); + assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); + } + } + } + } + + public void testSearchShardsThreadContextHeader() { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes)) { + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); + try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + + try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) { + SearchRequest request = new SearchRequest("test-index"); + int numThreads = 10; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + final String threadId = Integer.toString(i); + executorService.submit(() -> { + ThreadContext threadContext = seedTransport.threadPool.getThreadContext(); + threadContext.putHeader("threadId", threadId); + AtomicReference reference = new AtomicReference<>(); + final ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") + .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) + .routing(request.routing()); + CountDownLatch responseLatch = new CountDownLatch(1); + client.admin().cluster().searchShards(searchShardsRequest, + new LatchedActionListener<>(ActionListener.wrap( + resp -> { + reference.set(resp); + assertEquals(threadId, seedTransport.threadPool.getThreadContext().getHeader("threadId")); + }, + e -> fail("no failures expected")), responseLatch)); + try { + responseLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertNotNull(reference.get()); + ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); + assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); + }); + } + ThreadPool.terminate(executorService, 5, TimeUnit.SECONDS); + } + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 02e701ed4bc..308d330d54f 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -43,7 +42,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.internal.io.IOUtils; @@ -558,7 +556,7 @@ public class RemoteClusterConnectionTests extends ESTestCase { } } - private List>> seedNodes(final DiscoveryNode... seedNodes) { + private static List>> seedNodes(final DiscoveryNode... seedNodes) { if (seedNodes.length == 0) { return Collections.emptyList(); } else if (seedNodes.length == 1) { @@ -570,205 +568,6 @@ public class RemoteClusterConnectionTests extends ESTestCase { } } - public void testFetchShards() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); - MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(discoverableTransport.getLocalDiscoNode()); - Collections.shuffle(knownNodes, random()); - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - final List>> seedNodes = seedNodes(seedNode); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { - if (randomBoolean()) { - updateSeedNodes(connection, seedNodes); - } - if (randomBoolean()) { - connection.updateSkipUnavailable(randomBoolean()); - } - SearchRequest request = new SearchRequest("test-index"); - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") - .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) - .routing(request.routing()); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - responseLatch.await(); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); - assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); - assertTrue(connection.assertNoRunningConnections()); - } - } - } - } - - public void testFetchShardsThreadContextHeader() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); - MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(discoverableTransport.getLocalDiscoNode()); - Collections.shuffle(knownNodes, random()); - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - final List>> seedNodes = seedNodes(seedNode); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { - SearchRequest request = new SearchRequest("test-index"); - Thread[] threads = new Thread[10]; - for (int i = 0; i < threads.length; i++) { - final String threadId = Integer.toString(i); - threads[i] = new Thread(() -> { - ThreadContext threadContext = seedTransport.threadPool.getThreadContext(); - threadContext.putHeader("threadId", threadId); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - final ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") - .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) - .routing(request.routing()); - CountDownLatch responseLatch = new CountDownLatch(1); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap( - resp -> { - reference.set(resp); - assertEquals(threadId, seedTransport.threadPool.getThreadContext().getHeader("threadId")); - }, - failReference::set), responseLatch)); - try { - responseLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); - assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); - }); - } - for (int i = 0; i < threads.length; i++) { - threads[i].start(); - } - - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - } - assertTrue(connection.assertNoRunningConnections()); - } - } - } - } - - public void testFetchShardsSkipUnavailable() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedNode); - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { - ConnectionManager connectionManager = connection.getConnectionManager(); - - SearchRequest request = new SearchRequest("test-index"); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") - .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) - .routing(request.routing()); - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse response = reference.get(); - assertTrue(response != ClusterSearchShardsResponse.EMPTY); - assertEquals(knownNodes, Arrays.asList(response.getNodes())); - } - - CountDownLatch disconnectedLatch = new CountDownLatch(1); - connectionManager.addListener(new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (node.equals(seedNode)) { - disconnectedLatch.countDown(); - } - } - }); - - service.addFailToSendNoConnectRule(seedTransport); - - if (randomBoolean()) { - connection.updateSkipUnavailable(false); - } - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap((s) -> { - reference.set(s); - }, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNotNull(failReference.get()); - assertNull(reference.get()); - assertThat(failReference.get(), instanceOf(TransportException.class)); - } - - connection.updateSkipUnavailable(true); - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse response = reference.get(); - assertTrue(response == ClusterSearchShardsResponse.EMPTY); - } - - //give transport service enough time to realize that the node is down, and to notify the connection listeners - //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next - assertTrue(disconnectedLatch.await(10, TimeUnit.SECONDS)); - - if (randomBoolean()) { - connection.updateSkipUnavailable(false); - } - - service.clearAllRules(); - //check that we reconnect once the node is back up - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse response = reference.get(); - assertTrue(response != ClusterSearchShardsResponse.EMPTY); - assertEquals(knownNodes, Arrays.asList(response.getNodes())); - } - } - } - } - } - public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index d5671eec219..60f3ece86bc 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -20,9 +20,7 @@ package org.elasticsearch.transport; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; @@ -33,7 +31,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; @@ -60,9 +57,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -711,172 +706,6 @@ public class RemoteClusterServiceTests extends ESTestCase { } } - public void testCollectSearchShards() throws Exception { - int numClusters = randomIntBetween(2, 10); - MockTransportService[] mockTransportServices = new MockTransportService[numClusters]; - DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; - Map remoteIndicesByCluster = new HashMap<>(); - Settings.Builder builder = Settings.builder(); - for (int i = 0; i < numClusters; i++) { - List knownNodes = new CopyOnWriteArrayList<>(); - MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes, Version.CURRENT); - mockTransportServices[i] = remoteSeedTransport; - DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode(); - knownNodes.add(remoteSeedNode); - nodes[i] = remoteSeedNode; - builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); - remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); - } - Settings settings = builder.build(); - - try { - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - try (RemoteClusterService remoteClusterService = new RemoteClusterService(settings, service)) { - assertFalse(remoteClusterService.isCrossClusterSearchEnabled()); - remoteClusterService.initializeRemoteClusters(); - assertTrue(remoteClusterService.isCrossClusterSearchEnabled()); - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(failure.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - assertEquals(1, shardsResponse.getNodes().length); - } - } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", - null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(response.get()); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); - assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); - } - int numDisconnectedClusters = randomIntBetween(1, numClusters); - Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); - Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); - while(disconnectedNodes.size() < numDisconnectedClusters) { - int i = randomIntBetween(0, numClusters - 1); - if (disconnectedNodes.add(nodes[i])) { - assertTrue(disconnectedNodesIndices.add(i)); - } - } - - CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); - for (RemoteClusterConnection connection : remoteClusterService.getConnections()) { - connection.getConnectionManager().addListener(new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (disconnectedNodes.remove(node)) { - disconnectedLatch.countDown(); - } - } - }); - } - - for (DiscoveryNode disconnectedNode : disconnectedNodes) { - service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); - } - - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(response.get()); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); - assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); - } - - //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again - for (int i : disconnectedNodesIndices) { - remoteClusterService.updateSkipUnavailable("remote" + i, true); - } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(failure.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - if (disconnectedNodesIndices.contains(i)) { - assertTrue(shardsResponse == ClusterSearchShardsResponse.EMPTY); - } else { - assertTrue(shardsResponse != ClusterSearchShardsResponse.EMPTY); - } - } - } - - //give transport service enough time to realize that the node is down, and to notify the connection listeners - //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next - assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); - - service.clearAllRules(); - if (randomBoolean()) { - for (int i : disconnectedNodesIndices) { - if (randomBoolean()) { - remoteClusterService.updateSkipUnavailable("remote" + i, true); - } - - } - } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(failure.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - assertNotSame(ClusterSearchShardsResponse.EMPTY, shardsResponse); - } - } - assertEquals(0, service.getConnectionManager().size()); - } - } - } finally { - for (MockTransportService mockTransportService : mockTransportServices) { - mockTransportService.close(); - } - } - } - public void testRemoteClusterSkipIfDisconnectedSetting() { { Settings settings = Settings.builder() @@ -1079,7 +908,7 @@ public class RemoteClusterServiceTests extends ESTestCase { } } - private void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List addresses, String proxyAddress) + private static void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List addresses, String proxyAddress) throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference exceptionAtomicReference = new AtomicReference<>(); @@ -1093,4 +922,40 @@ public class RemoteClusterServiceTests extends ESTestCase { throw exceptionAtomicReference.get(); } } + + public static void updateSkipUnavailable(RemoteClusterService service, String clusterAlias, boolean skipUnavailable) { + RemoteClusterConnection connection = service.getRemoteClusterConnection(clusterAlias); + connection.updateSkipUnavailable(skipUnavailable); + } + + public static void addConnectionListener(RemoteClusterService service, TransportConnectionListener listener) { + for (RemoteClusterConnection connection : service.getConnections()) { + ConnectionManager connectionManager = connection.getConnectionManager(); + connectionManager.addListener(listener); + } + } + + public void testSkipUnavailable() { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedNode); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); + try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + + assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + + if (randomBoolean()) { + updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", false); + assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + } + + updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", true); + assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + } + } + } }