Streamline skip_unavailable handling (#37672)

This commit moves the collectSearchShards method out of RemoteClusterService into TransportSearchAction that currently calls it. RemoteClusterService used to be used only for cross-cluster search but is now also used in cross-cluster replication where different API are called through the RemoteClusterAwareClient. There is no reason for the collectSearchShards and fetchShards methods to be respectively in RemoteClusterService and RemoteClusterConnection. The search shards API can be called through the RemoteClusterAwareClient too, the only missing bit is a way to handle failures based on the skip_unavailable setting for each cluster (currently only supported in RemoteClusterConnection#fetchShards) which is achieved by adding a isSkipUnavailable(String clusterAlias) method to RemoteClusterService.
This change is useful for #32125 as we will very soon need to also call the search API against remote clusters, which will be done through RemoteClusterAwareClient. In that case we will also need to support skip_unavailable when calling the search API so we need some way to handle the skip_unavailable setting like we currently do for the search_shards call.

Relates to #32125
This commit is contained in:
Luca Cavanna 2019-01-23 13:53:37 +01:00 committed by GitHub
parent d5139e0590
commit 12f5b02fd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 445 additions and 543 deletions

View File

@ -29,15 +29,11 @@ import org.elasticsearch.search.internal.AliasFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ClusterSearchShardsResponse extends ActionResponse implements ToXContentObject {
public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
new DiscoveryNode[0], Collections.emptyMap());
private final ClusterSearchShardsGroup[] groups;
private final DiscoveryNode[] nodes;
private final Map<String, AliasFilter> indicesAndFilters;

View File

@ -22,10 +22,12 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -39,6 +41,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
@ -50,6 +53,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
@ -60,8 +64,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
@ -195,17 +202,23 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY);
} else {
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
AtomicInteger skippedClusters = new AtomicInteger(0);
collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters,
remoteClusterIndices, remoteClusterService, threadPool,
ActionListener.wrap(
searchShardsResponses -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses);
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
clusters);
}, listener::onFailure));
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));
},
listener::onFailure));
}
}, listener::onFailure);
if (searchRequest.source() == null) {
@ -216,18 +229,56 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
}
static SearchResponse.Clusters buildClusters(OriginalIndices localIndices, Map<String, OriginalIndices> remoteIndices,
Map<String, ClusterSearchShardsResponse> searchShardsResponses) {
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteIndices.size() + localClusters;
int successfulClusters = localClusters;
for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) {
if (searchShardsResponse != ClusterSearchShardsResponse.EMPTY) {
successfulClusters++;
static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters,
Map<String, OriginalIndices> remoteIndicesByCluster, RemoteClusterService remoteClusterService,
ThreadPool threadPool, ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<RemoteTransportException> transportException = new AtomicReference<>();
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
final String clusterAlias = entry.getKey();
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
final String[] indices = entry.getValue().indices();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
.indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
clusterClient.admin().cluster().searchShards(searchShardsRequest, new ActionListener<ClusterSearchShardsResponse>() {
@Override
public void onResponse(ClusterSearchShardsResponse response) {
searchShardsResponses.put(clusterAlias, response);
maybeFinish();
}
@Override
public void onFailure(Exception e) {
if (skipUnavailable) {
skippedClusters.incrementAndGet();
} else {
RemoteTransportException exception =
new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e);
if (transportException.compareAndSet(null, exception) == false) {
transportException.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
return current;
});
}
}
int skippedClusters = totalClusters - successfulClusters;
return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
maybeFinish();
}
private void maybeFinish() {
if (responsesCountDown.countDown()) {
RemoteTransportException exception = transportException.get();
if (exception == null) {
listener.onResponse(searchShardsResponses);
} else {
listener.onFailure(transportException.get());
}
}
}
}
);
}
}
static BiFunction<String, String, DiscoveryNode> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,

View File

@ -25,9 +25,6 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@ -62,7 +59,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -172,6 +168,13 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
this.skipUnavailable = skipUnavailable;
}
/**
* Returns whether this cluster is configured to be skipped when unavailable
*/
boolean isSkipUnavailable() {
return skipUnavailable;
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
boolean remove = connectedNodes.remove(node);
@ -181,31 +184,11 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
}
}
/**
* Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end.
*/
public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
ActionListener<ClusterSearchShardsResponse> listener) {
final ActionListener<ClusterSearchShardsResponse> searchShardsListener;
final Consumer<Exception> onConnectFailure;
if (skipUnavailable) {
onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY);
searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY));
} else {
onConnectFailure = listener::onFailure;
searchShardsListener = listener;
}
// in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on
// the skip_unavailable setting
ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure));
}
/**
* Ensures that this cluster is connected. If the cluster is connected this operation
* will invoke the listener immediately.
*/
public void ensureConnected(ActionListener<Void> voidActionListener) {
void ensureConnected(ActionListener<Void> voidActionListener) {
if (connectedNodes.size() == 0) {
connectHandler.connect(voidActionListener);
} else {
@ -213,35 +196,6 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
}
}
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
transportService.sendRequest(connection, ClusterSearchShardsAction.NAME, searchShardsRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterSearchShardsResponse>() {
@Override
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
return new ClusterSearchShardsResponse(in);
}
@Override
public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
listener.onResponse(clusterSearchShardsResponse);
}
@Override
public void handleException(TransportException e) {
listener.onFailure(e);
}
@Override
public String executor() {
return ThreadPool.Names.SEARCH;
}
});
}
/**
* Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function
* that returns <code>null</code> if the node ID is not found.

View File

@ -24,8 +24,6 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
@ -50,10 +48,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
@ -287,7 +283,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
String clusterAlias = entry.getKey();
List<String> originalIndices = entry.getValue();
originalIndicesMap.put(clusterAlias,
new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
new OriginalIndices(originalIndices.toArray(new String[0]), indicesOptions));
}
}
} else {
@ -311,55 +307,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
return remoteClusters.keySet();
}
public void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing,
Map<String, OriginalIndices> remoteIndicesByCluster,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<RemoteTransportException> transportException = new AtomicReference<>();
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
final String clusterName = entry.getKey();
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName);
if (remoteClusterConnection == null) {
throw new IllegalArgumentException("no such remote cluster: " + clusterName);
}
final String[] indices = entry.getValue().indices();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
.indicesOptions(indicesOptions).local(true).preference(preference)
.routing(routing);
remoteClusterConnection.fetchSearchShards(searchShardsRequest,
new ActionListener<ClusterSearchShardsResponse>() {
@Override
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
if (responsesCountDown.countDown()) {
RemoteTransportException exception = transportException.get();
if (exception == null) {
listener.onResponse(searchShardsResponses);
} else {
listener.onFailure(transportException.get());
}
}
}
@Override
public void onFailure(Exception e) {
RemoteTransportException exception =
new RemoteTransportException("error while communicating with remote cluster [" + clusterName + "]", e);
if (transportException.compareAndSet(null, exception) == false) {
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
return current;
});
}
if (responsesCountDown.countDown()) {
listener.onFailure(exception);
}
}
});
}
}
/**
* Returns a connection to the given node on the given remote cluster
* @throws IllegalArgumentException if the remote cluster is unknown
@ -376,6 +323,13 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
getRemoteClusterConnection(clusterAlias).ensureConnected(listener);
}
/**
* Returns whether the cluster identified by the provided alias is configured to be skipped when unavailable
*/
public boolean isSkipUnavailable(String clusterAlias) {
return getRemoteClusterConnection(clusterAlias).isSkipUnavailable();
}
public Transport.Connection getConnection(String cluster) {
return getRemoteClusterConnection(cluster).getConnection();
}
@ -399,7 +353,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
clusterSettings.addAffixUpdateConsumer(SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {});
}
synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
if (remote != null) {
remote.updateSkipUnavailable(skipUnavailable);
@ -510,5 +464,4 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
Collection<RemoteClusterConnection> getConnections() {
return remoteClusters.values();
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
@ -38,13 +39,19 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.RemoteClusterConnectionTests;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteClusterServiceTests;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -53,13 +60,22 @@ import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.startsWith;
public class TransportSearchActionTests extends ESTestCase {
@ -304,41 +320,169 @@ public class TransportSearchActionTests extends ESTestCase {
}
}
public void testBuildClusters() {
OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices();
Map<String, OriginalIndices> remoteIndices = new HashMap<>();
Map<String, ClusterSearchShardsResponse> searchShardsResponses = new HashMap<>();
int numRemoteClusters = randomIntBetween(0, 10);
boolean onlySuccessful = randomBoolean();
int localClusters = localIndices == null ? 0 : 1;
int total = numRemoteClusters + localClusters;
int successful = localClusters;
int skipped = 0;
for (int i = 0; i < numRemoteClusters; i++) {
String cluster = randomAlphaOfLengthBetween(5, 10);
remoteIndices.put(cluster, randomOriginalIndices());
if (onlySuccessful || randomBoolean()) {
//whatever response counts as successful as long as it's not the empty placeholder
searchShardsResponses.put(cluster, new ClusterSearchShardsResponse(null, null, null));
successful++;
} else {
searchShardsResponses.put(cluster, ClusterSearchShardsResponse.EMPTY);
skipped++;
}
}
SearchResponse.Clusters clusters = TransportSearchAction.buildClusters(localIndices, remoteIndices, searchShardsResponses);
assertEquals(total, clusters.getTotal());
assertEquals(successful, clusters.getSuccessful());
assertEquals(skipped, clusters.getSkipped());
private MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes) {
return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool);
}
private static OriginalIndices randomOriginalIndices() {
int numLocalIndices = randomIntBetween(0, 5);
String[] localIndices = new String[numLocalIndices];
for (int i = 0; i < numLocalIndices; i++) {
localIndices[i] = randomAlphaOfLengthBetween(3, 10);
public void testCollectSearchShards() throws Exception {
int numClusters = randomIntBetween(2, 10);
MockTransportService[] mockTransportServices = new MockTransportService[numClusters];
DiscoveryNode[] nodes = new DiscoveryNode[numClusters];
Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
Settings.Builder builder = Settings.builder();
for (int i = 0; i < numClusters; i++) {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes);
mockTransportServices[i] = remoteSeedTransport;
DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode();
knownNodes.add(remoteSeedNode);
nodes[i] = remoteSeedNode;
builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString());
remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen()));
}
Settings settings = builder.build();
try {
try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicInteger skippedClusters = new AtomicInteger();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias);
assertEquals(1, shardsResponse.getNodes().length);
}
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> failure = new AtomicReference<>();
AtomicInteger skippedClusters = new AtomicInteger(0);
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status());
}
int numDisconnectedClusters = randomIntBetween(1, numClusters);
Set<DiscoveryNode> disconnectedNodes = new HashSet<>(numDisconnectedClusters);
Set<Integer> disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters);
while (disconnectedNodes.size() < numDisconnectedClusters) {
int i = randomIntBetween(0, numClusters - 1);
if (disconnectedNodes.add(nodes[i])) {
assertTrue(disconnectedNodesIndices.add(i));
}
}
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}
}
});
for (DiscoveryNode disconnectedNode : disconnectedNodes) {
service.addFailToSendNoConnectRule(disconnectedNode.getAddress());
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger skippedClusters = new AtomicInteger(0);
AtomicReference<Exception> failure = new AtomicReference<>();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
}
//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
for (int i : disconnectedNodesIndices) {
RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true);
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger skippedClusters = new AtomicInteger(0);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters - disconnectedNodesIndices.size(), map.size());
assertEquals(skippedClusters.get(), disconnectedNodesIndices.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
if (disconnectedNodesIndices.contains(i)) {
assertFalse(map.containsKey(clusterAlias));
} else {
assertNotNull(map.get(clusterAlias));
}
}
}
//give transport service enough time to realize that the node is down, and to notify the connection listeners
//so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next
assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS));
service.clearAllRules();
if (randomBoolean()) {
for (int i : disconnectedNodesIndices) {
if (randomBoolean()) {
RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true);
}
}
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger skippedClusters = new AtomicInteger(0);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters,
remoteIndicesByCluster, remoteClusterService, threadPool,
new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertEquals(0, skippedClusters.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
assertNotNull(map.get(clusterAlias));
}
}
assertEquals(0, service.getConnectionManager().size());
}
} finally {
for (MockTransportService mockTransportService : mockTransportServices) {
mockTransportService.close();
}
}
return new OriginalIndices(localIndices, IndicesOptions.fromOptions(randomBoolean(),
randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()));
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class RemoteClusterAwareClientTests extends ESTestCase {
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
private MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes) {
return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool);
}
public void testSearchShards() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes)) {
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) {
SearchRequest request = new SearchRequest("test-index");
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index")
.indicesOptions(request.indicesOptions()).local(true).preference(request.preference())
.routing(request.routing());
client.admin().cluster().searchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, e -> fail("no failures expected")), responseLatch));
responseLatch.await();
assertNotNull(reference.get());
ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get();
assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes()));
}
}
}
}
public void testSearchShardsThreadContextHeader() {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes)) {
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) {
SearchRequest request = new SearchRequest("test-index");
int numThreads = 10;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
final String threadId = Integer.toString(i);
executorService.submit(() -> {
ThreadContext threadContext = seedTransport.threadPool.getThreadContext();
threadContext.putHeader("threadId", threadId);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
final ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index")
.indicesOptions(request.indicesOptions()).local(true).preference(request.preference())
.routing(request.routing());
CountDownLatch responseLatch = new CountDownLatch(1);
client.admin().cluster().searchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(
resp -> {
reference.set(resp);
assertEquals(threadId, seedTransport.threadPool.getThreadContext().getHeader("threadId"));
},
e -> fail("no failures expected")), responseLatch));
try {
responseLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertNotNull(reference.get());
ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get();
assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes()));
});
}
ThreadPool.terminate(executorService, 5, TimeUnit.SECONDS);
}
}
}
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -43,7 +42,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.core.internal.io.IOUtils;
@ -558,7 +556,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
private List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes(final DiscoveryNode... seedNodes) {
private static List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes(final DiscoveryNode... seedNodes) {
if (seedNodes.length == 0) {
return Collections.emptyList();
} else if (seedNodes.length == 1) {
@ -570,205 +568,6 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
public void testFetchShards() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = seedNodes(seedNode);
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true, null)) {
if (randomBoolean()) {
updateSeedNodes(connection, seedNodes);
}
if (randomBoolean()) {
connection.updateSkipUnavailable(randomBoolean());
}
SearchRequest request = new SearchRequest("test-index");
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index")
.indicesOptions(request.indicesOptions()).local(true).preference(request.preference())
.routing(request.routing());
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch));
responseLatch.await();
assertNull(failReference.get());
assertNotNull(reference.get());
ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get();
assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes()));
assertTrue(connection.assertNoRunningConnections());
}
}
}
}
public void testFetchShardsThreadContextHeader() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
final List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes = seedNodes(seedNode);
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes, service, Integer.MAX_VALUE, n -> true, null)) {
SearchRequest request = new SearchRequest("test-index");
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
final String threadId = Integer.toString(i);
threads[i] = new Thread(() -> {
ThreadContext threadContext = seedTransport.threadPool.getThreadContext();
threadContext.putHeader("threadId", threadId);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
final ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index")
.indicesOptions(request.indicesOptions()).local(true).preference(request.preference())
.routing(request.routing());
CountDownLatch responseLatch = new CountDownLatch(1);
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(
resp -> {
reference.set(resp);
assertEquals(threadId, seedTransport.threadPool.getThreadContext().getHeader("threadId"));
},
failReference::set), responseLatch));
try {
responseLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertNull(failReference.get());
assertNotNull(reference.get());
ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get();
assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes()));
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
assertTrue(connection.assertNoRunningConnections());
}
}
}
}
public void testFetchShardsSkipUnavailable() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedNode);
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) {
ConnectionManager connectionManager = connection.getConnectionManager();
SearchRequest request = new SearchRequest("test-index");
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index")
.indicesOptions(request.indicesOptions()).local(true).preference(request.preference())
.routing(request.routing());
{
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch));
assertTrue(responseLatch.await(10, TimeUnit.SECONDS));
assertNull(failReference.get());
assertNotNull(reference.get());
ClusterSearchShardsResponse response = reference.get();
assertTrue(response != ClusterSearchShardsResponse.EMPTY);
assertEquals(knownNodes, Arrays.asList(response.getNodes()));
}
CountDownLatch disconnectedLatch = new CountDownLatch(1);
connectionManager.addListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (node.equals(seedNode)) {
disconnectedLatch.countDown();
}
}
});
service.addFailToSendNoConnectRule(seedTransport);
if (randomBoolean()) {
connection.updateSkipUnavailable(false);
}
{
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap((s) -> {
reference.set(s);
}, failReference::set), responseLatch));
assertTrue(responseLatch.await(10, TimeUnit.SECONDS));
assertNotNull(failReference.get());
assertNull(reference.get());
assertThat(failReference.get(), instanceOf(TransportException.class));
}
connection.updateSkipUnavailable(true);
{
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch));
assertTrue(responseLatch.await(10, TimeUnit.SECONDS));
assertNull(failReference.get());
assertNotNull(reference.get());
ClusterSearchShardsResponse response = reference.get();
assertTrue(response == ClusterSearchShardsResponse.EMPTY);
}
//give transport service enough time to realize that the node is down, and to notify the connection listeners
//so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next
assertTrue(disconnectedLatch.await(10, TimeUnit.SECONDS));
if (randomBoolean()) {
connection.updateSkipUnavailable(false);
}
service.clearAllRules();
//check that we reconnect once the node is back up
{
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch));
assertTrue(responseLatch.await(10, TimeUnit.SECONDS));
assertNull(failReference.get());
assertNotNull(reference.get());
ClusterSearchShardsResponse response = reference.get();
assertTrue(response != ClusterSearchShardsResponse.EMPTY);
assertEquals(knownNodes, Arrays.asList(response.getNodes()));
}
}
}
}
}
public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);

View File

@ -20,9 +20,7 @@ package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
@ -33,7 +31,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
@ -60,9 +57,7 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
@ -711,172 +706,6 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
}
public void testCollectSearchShards() throws Exception {
int numClusters = randomIntBetween(2, 10);
MockTransportService[] mockTransportServices = new MockTransportService[numClusters];
DiscoveryNode[] nodes = new DiscoveryNode[numClusters];
Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
Settings.Builder builder = Settings.builder();
for (int i = 0; i < numClusters; i++) {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes, Version.CURRENT);
mockTransportServices[i] = remoteSeedTransport;
DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode();
knownNodes.add(remoteSeedNode);
nodes[i] = remoteSeedNode;
builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString());
remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen()));
}
Settings settings = builder.build();
try {
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterService remoteClusterService = new RemoteClusterService(settings, service)) {
assertFalse(remoteClusterService.isCrossClusterSearchEnabled());
remoteClusterService.initializeRemoteClusters();
assertTrue(remoteClusterService.isCrossClusterSearchEnabled());
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNull(failure.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias);
assertEquals(1, shardsResponse.getNodes().length);
}
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found",
null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNull(response.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status());
}
int numDisconnectedClusters = randomIntBetween(1, numClusters);
Set<DiscoveryNode> disconnectedNodes = new HashSet<>(numDisconnectedClusters);
Set<Integer> disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters);
while(disconnectedNodes.size() < numDisconnectedClusters) {
int i = randomIntBetween(0, numClusters - 1);
if (disconnectedNodes.add(nodes[i])) {
assertTrue(disconnectedNodesIndices.add(i));
}
}
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
for (RemoteClusterConnection connection : remoteClusterService.getConnections()) {
connection.getConnectionManager().addListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}
}
});
}
for (DiscoveryNode disconnectedNode : disconnectedNodes) {
service.addFailToSendNoConnectRule(disconnectedNode.getAddress());
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNull(response.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
}
//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
for (int i : disconnectedNodesIndices) {
remoteClusterService.updateSkipUnavailable("remote" + i, true);
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNull(failure.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias);
if (disconnectedNodesIndices.contains(i)) {
assertTrue(shardsResponse == ClusterSearchShardsResponse.EMPTY);
} else {
assertTrue(shardsResponse != ClusterSearchShardsResponse.EMPTY);
}
}
}
//give transport service enough time to realize that the node is down, and to notify the connection listeners
//so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next
assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS));
service.clearAllRules();
if (randomBoolean()) {
for (int i : disconnectedNodesIndices) {
if (randomBoolean()) {
remoteClusterService.updateSkipUnavailable("remote" + i, true);
}
}
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
awaitLatch(latch, 5, TimeUnit.SECONDS);
assertNull(failure.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias);
assertNotSame(ClusterSearchShardsResponse.EMPTY, shardsResponse);
}
}
assertEquals(0, service.getConnectionManager().size());
}
}
} finally {
for (MockTransportService mockTransportService : mockTransportServices) {
mockTransportService.close();
}
}
}
public void testRemoteClusterSkipIfDisconnectedSetting() {
{
Settings settings = Settings.builder()
@ -1079,7 +908,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
}
private void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List<String> addresses, String proxyAddress)
private static void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List<String> addresses, String proxyAddress)
throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
@ -1093,4 +922,40 @@ public class RemoteClusterServiceTests extends ESTestCase {
throw exceptionAtomicReference.get();
}
}
public static void updateSkipUnavailable(RemoteClusterService service, String clusterAlias, boolean skipUnavailable) {
RemoteClusterConnection connection = service.getRemoteClusterConnection(clusterAlias);
connection.updateSkipUnavailable(skipUnavailable);
}
public static void addConnectionListener(RemoteClusterService service, TransportConnectionListener listener) {
for (RemoteClusterConnection connection : service.getConnections()) {
ConnectionManager connectionManager = connection.getConnectionManager();
connectionManager.addListener(listener);
}
}
public void testSkipUnavailable() {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedNode);
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString());
try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1"));
if (randomBoolean()) {
updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", false);
assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1"));
}
updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", true);
assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1"));
}
}
}
}