Cross Cluster Search: make remote clusters optional (#27182)

Today Cross Cluster Search requires at least one node in each remote cluster to be up once the cross cluster search is run. Otherwise the whole search request fails despite some of the data (either local and/or remote) is available. This happens when performing the _search/shards calls to find out which remote shards the query has to be executed on. This scenario is different from shard failures that may happen later on when the query is actually executed, in case e.g. remote shards are missing, which is not going to fail the whole request but rather yield partial results, and the _shards section in the response will indicate that.

This commit introduces a boolean setting per cluster called search.remote.$cluster_alias.skip_if_disconnected, set to false by default, which allows to skip certain clusters if they are down when trying to reach them through a cross cluster search requests. By default all clusters are mandatory.

Scroll requests support such setting too when they are first initiated (first search request with scroll parameter), but subsequent scroll rounds (_search/scroll endpoint) will fail if some of the remote clusters went down meanwhile.

The search API response contains now a new _clusters section, similar to the _shards section, that gets returned whenever one or more clusters were disconnected and got skipped:

"_clusters" : {
    "total" : 3,
    "successful" : 2,
    "skipped" : 1
}
Such section won't be part of the response if no clusters have been skipped.

The per cluster skip_unavailable setting value has also been added to the output of the remote/info API.
This commit is contained in:
Luca Cavanna 2017-11-21 11:41:47 +01:00 committed by GitHub
parent dd0bb580b0
commit 29450de7b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1464 additions and 165 deletions

View File

@ -53,6 +53,7 @@ public class TransportNoopSearchAction extends HandledTransportAction<SearchRequ
new SearchHit[0], 0L, 0.0f),
new InternalAggregations(Collections.emptyList()),
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, new ShardSearchFailure[0]));
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY));
}
}

View File

@ -166,7 +166,8 @@ public class RestHighLevelClientTests extends ESTestCase {
public void testSearchScroll() throws IOException {
Header[] headers = randomHeaders(random(), "Header");
SearchResponse mockSearchResponse = new SearchResponse(new SearchResponseSections(SearchHits.empty(), InternalAggregations.EMPTY,
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, new ShardSearchFailure[0]);
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY);
mockResponse(mockSearchResponse);
SearchResponse searchResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(randomAlphaOfLengthBetween(5, 10)),
headers);

View File

@ -470,5 +470,6 @@ public class SearchIT extends ESRestHighLevelClientTestCase {
assertThat(searchResponse.getTotalShards(), greaterThan(0));
assertEquals(searchResponse.getTotalShards(), searchResponse.getSuccessfulShards());
assertEquals(0, searchResponse.getShardFailures().length);
assertEquals(SearchResponse.Clusters.EMPTY, searchResponse.getClusters());
}
}

View File

@ -30,11 +30,15 @@ import org.elasticsearch.search.internal.AliasFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ClusterSearchShardsResponse extends ActionResponse implements ToXContentObject {
public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
new DiscoveryNode[0], Collections.emptyMap());
private ClusterSearchShardsGroup[] groups;
private DiscoveryNode[] nodes;
private Map<String, AliasFilter> indicesAndFilters;

View File

@ -68,7 +68,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger skippedOps = new AtomicInteger();
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
@ -76,7 +76,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests) {
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests,
SearchResponse.Clusters clusters) {
super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor);
this.timeProvider = timeProvider;
this.logger = logger;
@ -90,6 +91,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.concreteIndexBoosts = concreteIndexBoosts;
this.aliasFilter = aliasFilter;
this.results = resultConsumer;
this.clusters = clusters;
}
/**
@ -108,7 +110,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
//no search shards to search on, bail with empty response
//(it happens with search across _all with no indices around and consistent with broadcast operations)
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY));
ShardSearchFailure.EMPTY_ARRAY, clusters));
return;
}
executePhase(this);
@ -264,7 +266,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), buildShardFailures());
skippedOps.get(), buildTookInMillis(), buildShardFailures(), clusters);
}
@Override

View File

@ -33,7 +33,7 @@ import java.util.function.Function;
import java.util.stream.Stream;
/**
* This search phrase can be used as an initial search phase to pre-filter search shards based on query rewriting.
* This search phase can be used as an initial search phase to pre-filter search shards based on query rewriting.
* The queries are rewritten against the shards and based on the rewrite result shards might be able to be excluded
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
@ -50,13 +50,15 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory) {
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters) {
/*
* We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node
* is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards.
*/
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size());
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(),
clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
}

View File

@ -40,10 +40,10 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
final long clusterStateVersion, final SearchTask task) {
final long clusterStateVersion, final SearchTask task, SearchResponse.Clusters clusters) {
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests());
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
}

View File

@ -40,10 +40,10 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion, SearchTask task) {
long clusterStateVersion, SearchTask task, SearchResponse.Clusters clusters) {
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
request.getMaxConcurrentShardRequests());
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
}

View File

@ -26,8 +26,10 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
@ -43,6 +45,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
@ -71,15 +74,18 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
private ShardSearchFailure[] shardFailures;
private Clusters clusters;
private long tookInMillis;
public SearchResponse() {
}
public SearchResponse(SearchResponseSections internalResponse, String scrollId, int totalShards, int successfulShards,
int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures) {
int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures, Clusters clusters) {
this.internalResponse = internalResponse;
this.scrollId = scrollId;
this.clusters = clusters;
this.totalShards = totalShards;
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
@ -199,6 +205,15 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
return internalResponse.profile();
}
/**
* Returns info about what clusters the search was executed against. Available only in responses obtained
* from a Cross Cluster Search request, otherwise <code>null</code>
* @see Clusters
*/
public Clusters getClusters() {
return clusters;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -221,6 +236,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
}
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getSkippedShards(),
getFailedShards(), getShardFailures());
clusters.toXContent(builder, params);
internalResponse.toXContent(builder, params);
return builder;
}
@ -242,6 +258,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
int skippedShards = 0; // 0 for BWC
String scrollId = null;
List<ShardSearchFailure> failures = new ArrayList<>();
Clusters clusters = Clusters.EMPTY;
while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
@ -296,6 +313,28 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
parser.skipChildren();
}
}
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName)) {
int successful = -1;
int total = -1;
int skipped = -1;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (Clusters.SUCCESSFUL_FIELD.match(currentFieldName)) {
successful = parser.intValue();
} else if (Clusters.TOTAL_FIELD.match(currentFieldName)) {
total = parser.intValue();
} else if (Clusters.SKIPPED_FIELD.match(currentFieldName)) {
skipped = parser.intValue();
} else {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
clusters = new Clusters(total, successful, skipped);
} else {
parser.skipChildren();
}
@ -304,7 +343,7 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
SearchResponseSections searchResponseSections = new SearchResponseSections(hits, aggs, suggest, timedOut, terminatedEarly,
profile, numReducePhases);
return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, skippedShards, tookInMillis,
failures.toArray(new ShardSearchFailure[failures.size()]));
failures.toArray(new ShardSearchFailure[failures.size()]), clusters);
}
@Override
@ -322,6 +361,12 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
shardFailures[i] = readShardSearchFailure(in);
}
}
//TODO update version once backported
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
clusters = new Clusters(in);
} else {
clusters = Clusters.EMPTY;
}
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
@ -340,7 +385,10 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
for (ShardSearchFailure shardSearchFailure : shardFailures) {
shardSearchFailure.writeTo(out);
}
//TODO update version once backported
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
clusters.writeTo(out);
}
out.writeOptionalString(scrollId);
out.writeVLong(tookInMillis);
if(out.getVersion().onOrAfter(Version.V_5_6_0)) {
@ -353,4 +401,101 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
return Strings.toString(this);
}
/**
* Holds info about the clusters that the search was executed on: how many in total, how many of them were successful
* and how many of them were skipped.
*/
public static class Clusters implements ToXContent, Writeable {
public static final Clusters EMPTY = new Clusters(0, 0, 0);
static final ParseField _CLUSTERS_FIELD = new ParseField("_clusters");
static final ParseField SUCCESSFUL_FIELD = new ParseField("successful");
static final ParseField SKIPPED_FIELD = new ParseField("skipped");
static final ParseField TOTAL_FIELD = new ParseField("total");
private final int total;
private final int successful;
private final int skipped;
Clusters(int total, int successful, int skipped) {
assert total >= 0 && successful >= 0 && skipped >= 0
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
assert successful <= total && skipped == total - successful
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
this.total = total;
this.successful = successful;
this.skipped = skipped;
}
private Clusters(StreamInput in) throws IOException {
this.total = in.readVInt();
this.successful = in.readVInt();
this.skipped = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(total);
out.writeVInt(successful);
out.writeVInt(skipped);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (this != EMPTY) {
builder.startObject(_CLUSTERS_FIELD.getPreferredName());
builder.field(TOTAL_FIELD.getPreferredName(), total);
builder.field(SUCCESSFUL_FIELD.getPreferredName(), successful);
builder.field(SKIPPED_FIELD.getPreferredName(), skipped);
builder.endObject();
}
return builder;
}
/**
* Returns how many total clusters the search was requested to be executed on
*/
public int getTotal() {
return total;
}
/**
* Returns how many total clusters the search was executed successfully on
*/
public int getSuccessful() {
return successful;
}
/**
* Returns how many total clusters were during the execution of the search request
*/
public int getSkipped() {
return skipped;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Clusters clusters = (Clusters) o;
return total == clusters.total &&
successful == clusters.successful &&
skipped == clusters.skipped;
}
@Override
public int hashCode() {
return Objects.hash(total, successful, skipped);
}
@Override
public String toString() {
return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}';
}
}
}

View File

@ -249,7 +249,7 @@ abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements R
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
0, buildTookInMillis(), buildShardFailures()));
0, buildTookInMillis(), buildShardFailures(), SearchResponse.Clusters.EMPTY));
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
}

View File

@ -192,7 +192,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes()
.getDataNodes().size());
.getDataNodes().size(), SearchResponse.Clusters.EMPTY);
} else {
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
@ -200,10 +200,12 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
int numNodesInvovled = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum()
int numNodesInvolved = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum()
+ clusterState.getNodes().getDataNodes().size();
SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses);
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvovled);
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvolved,
clusters);
}, listener::onFailure));
}
}, listener::onFailure);
@ -215,6 +217,20 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
}
static SearchResponse.Clusters buildClusters(OriginalIndices localIndices, Map<String, OriginalIndices> remoteIndices,
Map<String, ClusterSearchShardsResponse> searchShardsResponses) {
int localClusters = Math.min(localIndices.indices().length, 1);
int totalClusters = remoteIndices.size() + localClusters;
int successfulClusters = localClusters;
for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) {
if (searchShardsResponse != ClusterSearchShardsResponse.EMPTY) {
successfulClusters++;
}
}
int skippedClusters = totalClusters - successfulClusters;
return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
}
static BiFunction<String, String, DiscoveryNode> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
Map<String, OriginalIndices> remoteIndicesByCluster,
List<SearchShardIterator> remoteShardIterators,
@ -264,7 +280,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
Map<String, OriginalIndices> remoteClusterIndices, List<SearchShardIterator> remoteShardIterators,
BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener, int nodeCount) {
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener, int nodeCount,
SearchResponse.Clusters clusters) {
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
@ -329,7 +346,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards).start();
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards, clusters).start();
}
private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators) {
@ -343,9 +360,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
OriginalIndices localIndices,
List<SearchShardIterator> remoteShardIterators) {
List<SearchShardIterator> shards = new ArrayList<>();
for (SearchShardIterator shardIterator : remoteShardIterators) {
shards.add(shardIterator);
}
shards.addAll(remoteShardIterators);
for (ShardIterator shardIterator : localShardsIterator) {
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
}
@ -363,34 +378,35 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
BiFunction<String, String, Transport.Connection> connectionLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
ActionListener<SearchResponse> listener, boolean preFilter) {
ActionListener<SearchResponse> listener, boolean preFilter,
SearchResponse.Clusters clusters) {
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
if (preFilter) {
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, (iter) -> {
AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup,
clusterStateVersion, aliasFilter, concreteIndexBoosts, listener, false);
clusterStateVersion, aliasFilter, concreteIndexBoosts, listener, false, clusters);
return new SearchPhase(action.getName()) {
@Override
public void run() throws IOException {
action.start();
}
};
});
}, clusters);
} else {
AbstractSearchAsyncAction searchAsyncAction;
switch (searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
timeProvider, clusterStateVersion, task, clusters);
break;
case QUERY_AND_FETCH:
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
timeProvider, clusterStateVersion, task, clusters);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");

View File

@ -263,6 +263,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.REMOTE_NODE_ATTRIBUTE,

View File

@ -664,7 +664,7 @@ public class Setting<T> implements ToXContentObject {
}
/**
* Returns the namespace for a concrete settting. Ie. an affix setting with prefix: <tt>search.</tt> and suffix: <tt>username</tt>
* Returns the namespace for a concrete setting. Ie. an affix setting with prefix: <tt>search.</tt> and suffix: <tt>username</tt>
* will return <tt>remote</tt> as a namespace for the setting <tt>search.remote.username</tt>
*/
public String getNamespace(Setting<T> concreteSetting) {
@ -1220,9 +1220,7 @@ public class Setting<T> implements ToXContentObject {
AffixSetting... dependencies) {
Setting<T> delegate = delegateFactory.apply("_na_");
return new AffixSetting<>(key, delegate, delegateFactory, dependencies);
};
}
public interface Key {
boolean match(String key);

View File

@ -53,6 +53,7 @@ public abstract class RemoteClusterAware extends AbstractComponent {
Setting.Property.NodeScope, Setting.Property.Dynamic));
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
protected final ClusterNameExpressionResolver clusterNameResolver;
/**
@ -160,7 +161,7 @@ public abstract class RemoteClusterAware extends AbstractComponent {
}
}
public static final String buildRemoteIndexName(String clusterAlias, String indexName) {
public static String buildRemoteIndexName(String clusterAlias, String indexName) {
return clusterAlias != null ? clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName : indexName;
}
}

View File

@ -61,6 +61,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -87,6 +88,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private volatile List<DiscoveryNode> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
@ -117,6 +119,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
remoteProfile = builder.build();
connectedNodes = new ConnectedNodes(clusterAlias);
this.seedNodes = Collections.unmodifiableList(seedNodes);
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
this.connectHandler = new ConnectHandler();
transportService.addConnectionListener(this);
}
@ -129,6 +133,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
connectHandler.connect(connectListener);
}
/**
* Updates the skipUnavailable flag that can be dynamically set for each remote cluster
*/
void updateSkipUnavailable(boolean skipUnavailable) {
this.skipUnavailable = skipUnavailable;
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
boolean remove = connectedNodes.remove(node);
@ -143,16 +154,19 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
*/
public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
ActionListener<ClusterSearchShardsResponse> listener) {
if (connectedNodes.size() == 0) {
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
// we can't proceed with a search on a cluster level.
// in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller
// end since they provide the listener.
ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, listener), listener::onFailure));
final ActionListener<ClusterSearchShardsResponse> searchShardsListener;
final Consumer<Exception> onConnectFailure;
if (skipUnavailable) {
onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY);
searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY));
} else {
fetchShardsInternal(searchRequest, listener);
onConnectFailure = listener::onFailure;
searchShardsListener = listener;
}
// in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on
// the skip_unavailable setting
ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure));
}
/**
@ -231,16 +245,12 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
});
};
try {
if (connectedNodes.size() == 0) {
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
// we can't proceed with a search on a cluster level.
// in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the
// caller end since they provide the listener.
ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure));
} else {
runnable.run();
}
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
// this will cause some back pressure on the search end and eventually will cause rejections but that's fine
// we can't proceed with a search on a cluster level.
// in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the
// caller end since they provide the listener.
ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure));
} catch (Exception ex) {
listener.onFailure(ex);
}
@ -600,7 +610,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
// not connected we return immediately
RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias,
Collections.emptyList(), Collections.emptyList(), maxNumRemoteConnections, 0,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings));
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
listener.onResponse(remoteConnectionStats);
} else {
NodesInfoRequest request = new NodesInfoRequest();
@ -634,9 +644,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
}
}
RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias,
seedNodes.stream().map(n -> n.getAddress()).collect(Collectors.toList()), new ArrayList<>(httpAddresses),
seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList()), new ArrayList<>(httpAddresses),
maxNumRemoteConnections, connectedNodes.size(),
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings));
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
listener.onResponse(remoteConnectionInfo);
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@ -54,6 +55,8 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.common.settings.Setting.boolSetting;
/**
* Basic service for accessing remote clusters via gateway nodes
*/
@ -89,6 +92,10 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
public static final Setting<Boolean> ENABLE_REMOTE_CLUSTERS = Setting.boolSetting("search.remote.connect", true,
Setting.Property.NodeScope);
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_SKIP_UNAVAILABLE =
Setting.affixKeySetting("search.remote.", "skip_unavailable",
key -> boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);
private final TransportService transportService;
private final int numRemoteConnections;
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
@ -231,7 +238,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
@Override
public void onFailure(Exception e) {
TransportException exception = new TransportException("unable to communicate with remote cluster [" +
clusterName + "]", e);
clusterName + "]", e);
if (transportException.compareAndSet(null, exception) == false) {
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
@ -283,6 +290,20 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
return this.remoteClusters.keySet();
}
@Override
public void listenForUpdates(ClusterSettings clusterSettings) {
super.listenForUpdates(clusterSettings);
clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable,
(clusterAlias, value) -> {});
}
synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
if (remote != null) {
remote.updateSkipUnavailable(skipUnavailable);
}
}
protected void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {}));
}

View File

@ -18,12 +18,12 @@
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -42,17 +42,19 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
final TimeValue initialConnectionTimeout;
final int numNodesConnected;
final String clusterAlias;
final boolean skipUnavailable;
RemoteConnectionInfo(String clusterAlias, List<TransportAddress> seedNodes,
List<TransportAddress> httpAddresses,
int connectionsPerCluster, int numNodesConnected,
TimeValue initialConnectionTimeout) {
TimeValue initialConnectionTimeout, boolean skipUnavailable) {
this.clusterAlias = clusterAlias;
this.seedNodes = seedNodes;
this.httpAddresses = httpAddresses;
this.connectionsPerCluster = connectionsPerCluster;
this.numNodesConnected = numNodesConnected;
this.initialConnectionTimeout = initialConnectionTimeout;
this.skipUnavailable = skipUnavailable;
}
public RemoteConnectionInfo(StreamInput input) throws IOException {
@ -62,6 +64,12 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
initialConnectionTimeout = new TimeValue(input);
numNodesConnected = input.readVInt();
clusterAlias = input.readString();
//TODO update version once backported
if (input.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
skipUnavailable = input.readBoolean();
} else {
skipUnavailable = false;
}
}
@Override
@ -82,6 +90,7 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
builder.field("num_nodes_connected", numNodesConnected);
builder.field("max_connections_per_cluster", connectionsPerCluster);
builder.field("initial_connect_timeout", initialConnectionTimeout);
builder.field("skip_unavailable", skipUnavailable);
}
builder.endObject();
return builder;
@ -95,6 +104,10 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
initialConnectionTimeout.writeTo(out);
out.writeVInt(numNodesConnected);
out.writeString(clusterAlias);
//TODO update version once backported
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(skipUnavailable);
}
}
@Override
@ -107,11 +120,13 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
Objects.equals(seedNodes, that.seedNodes) &&
Objects.equals(httpAddresses, that.httpAddresses) &&
Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) &&
Objects.equals(clusterAlias, that.clusterAlias);
Objects.equals(clusterAlias, that.clusterAlias) &&
skipUnavailable == that.skipUnavailable;
}
@Override
public int hashCode() {
return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout, numNodesConnected, clusterAlias);
return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout,
numNodesConnected, clusterAlias, skipUnavailable);
}
}

View File

@ -65,7 +65,8 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null,
request, null, new GroupShardsIterator<>(Collections.singletonList(
new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null,
new InitialSearchPhase.ArraySearchPhaseResults<>(10), request.getMaxConcurrentShardRequests()) {
new InitialSearchPhase.ArraySearchPhaseResults<>(10), request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY) {
@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
return null;

View File

@ -87,7 +87,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
public void run() throws IOException {
result.set(iter);
latch.countDown();
}});
}}, SearchResponse.Clusters.EMPTY);
canMatchPhase.start();
latch.await();
@ -164,7 +164,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
public void run() throws IOException {
result.set(iter);
latch.countDown();
}});
}}, SearchResponse.Clusters.EMPTY);
canMatchPhase.start();
latch.await();
@ -247,11 +247,10 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
listener.onFailure(new Exception("failure"));
}
}
});
}, SearchResponse.Clusters.EMPTY);
canMatchPhase.start();
latch.await();
executor.shutdown();
}
}

View File

@ -84,7 +84,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
@Override
public SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
return new SearchResponse(internalSearchResponse, scrollId, numShards, numSuccess.get(), 0, 0,
failures.toArray(new ShardSearchFailure[0]));
failures.toArray(new ShardSearchFailure[failures.size()]), SearchResponse.Clusters.EMPTY);
}
@Override

View File

@ -113,7 +113,8 @@ public class SearchAsyncActionTests extends ESTestCase {
0,
null,
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests()) {
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY) {
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
@ -203,7 +204,8 @@ public class SearchAsyncActionTests extends ESTestCase {
0,
null,
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests()) {
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY) {
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
@ -306,7 +308,8 @@ public class SearchAsyncActionTests extends ESTestCase {
0,
null,
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests()) {
request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY) {
TestSearchResponse response = new TestSearchResponse();
@Override

View File

@ -19,8 +19,14 @@
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
@ -31,6 +37,7 @@ import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchHitsTests;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationsTests;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
@ -40,14 +47,17 @@ import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.test.VersionUtils;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.XContentTestUtils.insertRandomFields;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
@ -61,6 +71,8 @@ public class SearchResponseTests extends ESTestCase {
xContentRegistry = new NamedXContentRegistry(namedXContents);
}
private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(
new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables());
private AggregationsTests aggregationsTests = new AggregationsTests();
@Before
@ -112,8 +124,16 @@ public class SearchResponseTests extends ESTestCase {
} else {
internalSearchResponse = InternalSearchResponse.empty();
}
return new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis,
shardSearchFailures);
shardSearchFailures, randomBoolean() ? randomClusters() : SearchResponse.Clusters.EMPTY);
}
private static SearchResponse.Clusters randomClusters() {
int totalClusters = randomIntBetween(0, 10);
int successfulClusters = randomIntBetween(0, totalClusters);
int skippedClusters = totalClusters - successfulClusters;
return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
}
/**
@ -193,29 +213,104 @@ public class SearchResponseTests extends ESTestCase {
SearchHit hit = new SearchHit(1, "id1", new Text("type"), Collections.emptyMap());
hit.score(2.0f);
SearchHit[] hits = new SearchHit[] { hit };
SearchResponse response = new SearchResponse(
new InternalSearchResponse(new SearchHits(hits, 100, 1.5f), null, null, null, false, null, 1), null, 0, 0, 0, 0,
new ShardSearchFailure[0]);
StringBuilder expectedString = new StringBuilder();
expectedString.append("{");
{
expectedString.append("\"took\":0,");
expectedString.append("\"timed_out\":false,");
expectedString.append("\"_shards\":");
SearchResponse response = new SearchResponse(
new InternalSearchResponse(new SearchHits(hits, 100, 1.5f), null, null, null, false, null, 1), null, 0, 0, 0, 0,
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY);
StringBuilder expectedString = new StringBuilder();
expectedString.append("{");
{
expectedString.append("{\"total\":0,");
expectedString.append("\"successful\":0,");
expectedString.append("\"skipped\":0,");
expectedString.append("\"failed\":0},");
expectedString.append("\"took\":0,");
expectedString.append("\"timed_out\":false,");
expectedString.append("\"_shards\":");
{
expectedString.append("{\"total\":0,");
expectedString.append("\"successful\":0,");
expectedString.append("\"skipped\":0,");
expectedString.append("\"failed\":0},");
}
expectedString.append("\"hits\":");
{
expectedString.append("{\"total\":100,");
expectedString.append("\"max_score\":1.5,");
expectedString.append("\"hits\":[{\"_type\":\"type\",\"_id\":\"id1\",\"_score\":2.0}]}");
}
}
expectedString.append("\"hits\":");
expectedString.append("}");
assertEquals(expectedString.toString(), Strings.toString(response));
}
{
SearchResponse response = new SearchResponse(
new InternalSearchResponse(new SearchHits(hits, 100, 1.5f), null, null, null, false, null, 1), null, 0, 0, 0, 0,
ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(5, 3, 2));
StringBuilder expectedString = new StringBuilder();
expectedString.append("{");
{
expectedString.append("{\"total\":100,");
expectedString.append("\"max_score\":1.5,");
expectedString.append("\"hits\":[{\"_type\":\"type\",\"_id\":\"id1\",\"_score\":2.0}]}");
expectedString.append("\"took\":0,");
expectedString.append("\"timed_out\":false,");
expectedString.append("\"_shards\":");
{
expectedString.append("{\"total\":0,");
expectedString.append("\"successful\":0,");
expectedString.append("\"skipped\":0,");
expectedString.append("\"failed\":0},");
}
expectedString.append("\"_clusters\":");
{
expectedString.append("{\"total\":5,");
expectedString.append("\"successful\":3,");
expectedString.append("\"skipped\":2},");
}
expectedString.append("\"hits\":");
{
expectedString.append("{\"total\":100,");
expectedString.append("\"max_score\":1.5,");
expectedString.append("\"hits\":[{\"_type\":\"type\",\"_id\":\"id1\",\"_score\":2.0}]}");
}
}
expectedString.append("}");
assertEquals(expectedString.toString(), Strings.toString(response));
}
}
public void testSerialization() throws IOException {
SearchResponse searchResponse = createTestItem(false);
BytesStreamOutput bytesStreamOutput = new BytesStreamOutput();
searchResponse.writeTo(bytesStreamOutput);
try (StreamInput in = new NamedWriteableAwareStreamInput(
StreamInput.wrap(bytesStreamOutput.bytes().toBytesRef().bytes), namedWriteableRegistry)) {
SearchResponse serialized = new SearchResponse();
serialized.readFrom(in);
assertEquals(searchResponse.getHits().totalHits, serialized.getHits().totalHits);
assertEquals(searchResponse.getHits().getHits().length, serialized.getHits().getHits().length);
assertEquals(searchResponse.getNumReducePhases(), serialized.getNumReducePhases());
assertEquals(searchResponse.getFailedShards(), serialized.getFailedShards());
assertEquals(searchResponse.getTotalShards(), serialized.getTotalShards());
assertEquals(searchResponse.getSkippedShards(), serialized.getSkippedShards());
assertEquals(searchResponse.getClusters(), serialized.getClusters());
}
}
public void testSerializationBwc() throws IOException {
final byte[] data = Base64.getDecoder().decode("AAAAAAAAAAAAAgABBQUAAAoAAAAAAAAA");
final Version version = VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, Version.V_6_0_0_rc2);
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(data), namedWriteableRegistry)) {
in.setVersion(version);
SearchResponse deserialized = new SearchResponse();
deserialized.readFrom(in);
assertSame(SearchResponse.Clusters.EMPTY, deserialized.getClusters());
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
deserialized.writeTo(out);
try (StreamInput in2 = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytesRef().bytes),
namedWriteableRegistry)) {
in2.setVersion(version);
SearchResponse deserialized2 = new SearchResponse();
deserialized2.readFrom(in2);
assertSame(SearchResponse.Clusters.EMPTY, deserialized2.getClusters());
}
}
}
expectedString.append("}");
assertEquals(expectedString.toString(), Strings.toString(response));
}
}

View File

@ -238,7 +238,43 @@ public class TransportSearchActionTests extends ESTestCase {
assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder());
assertNull(remoteAliases.get("xyz_id").getQueryBuilder());
}
}
public void testBuildClusters() {
OriginalIndices localIndices = randomOriginalIndices();
Map<String, OriginalIndices> remoteIndices = new HashMap<>();
Map<String, ClusterSearchShardsResponse> searchShardsResponses = new HashMap<>();
int numRemoteClusters = randomIntBetween(0, 10);
boolean onlySuccessful = randomBoolean();
int localClusters = localIndices.indices().length == 0 ? 0 : 1;
int total = numRemoteClusters + localClusters;
int successful = localClusters;
int skipped = 0;
for (int i = 0; i < numRemoteClusters; i++) {
String cluster = randomAlphaOfLengthBetween(5, 10);
remoteIndices.put(cluster, randomOriginalIndices());
if (onlySuccessful || randomBoolean()) {
//whatever response counts as successful as long as it's not the empty placeholder
searchShardsResponses.put(cluster, new ClusterSearchShardsResponse());
successful++;
} else {
searchShardsResponses.put(cluster, ClusterSearchShardsResponse.EMPTY);
skipped++;
}
}
SearchResponse.Clusters clusters = TransportSearchAction.buildClusters(localIndices, remoteIndices, searchShardsResponses);
assertEquals(total, clusters.getTotal());
assertEquals(successful, clusters.getSuccessful());
assertEquals(skipped, clusters.getSkipped());
}
private static OriginalIndices randomOriginalIndices() {
int numLocalIndices = randomIntBetween(0, 5);
String[] localIndices = new String[numLocalIndices];
for (int i = 0; i < numLocalIndices; i++) {
localIndices[i] = randomAlphaOfLengthBetween(3, 10);
}
return new OriginalIndices(localIndices, IndicesOptions.fromOptions(randomBoolean(),
randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()));
}
}

View File

@ -53,10 +53,10 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.net.InetAddress;
@ -65,6 +65,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
@ -80,6 +81,8 @@ import java.util.function.Function;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
public class RemoteClusterConnectionTests extends ESTestCase {
@ -162,14 +165,14 @@ public class RemoteClusterConnectionTests extends ESTestCase {
public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService incomaptibleTransport = startTransport("incompat_seed_node", knownNodes, Version.fromString("2.0.0"));
MockTransportService incompatibleTransport = startTransport("incompat_seed_node", knownNodes, Version.fromString("2.0.0"));
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
DiscoveryNode incompatibleSeedNode = incomaptibleTransport.getLocalDiscoNode();
DiscoveryNode incompatibleSeedNode = incompatibleTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
knownNodes.add(incomaptibleTransport.getLocalDiscoNode());
knownNodes.add(incompatibleTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());
List<DiscoveryNode> seedNodes = Arrays.asList(incompatibleSeedNode, seedNode);
Collections.shuffle(seedNodes, random());
@ -366,29 +369,24 @@ public class RemoteClusterConnectionTests extends ESTestCase {
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
List<DiscoveryNode> nodes = Collections.singletonList(seedNode);
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
nodes, service, Integer.MAX_VALUE, n -> true)) {
if (randomBoolean()) {
updateSeedNodes(connection, Arrays.asList(seedNode));
updateSeedNodes(connection, nodes);
}
if (randomBoolean()) {
connection.updateSkipUnavailable(randomBoolean());
}
SearchRequest request = new SearchRequest("test-index");
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
ActionListener<ClusterSearchShardsResponse> shardsListener = ActionListener.wrap(
x -> {
reference.set(x);
responseLatch.countDown();
},
x -> {
failReference.set(x);
responseLatch.countDown();
});
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(new String[]{"test-index"})
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index")
.indicesOptions(request.indicesOptions()).local(true).preference(request.preference())
.routing(request.routing());
connection.fetchSearchShards(searchShardsRequest, shardsListener);
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch));
responseLatch.await();
assertNull(failReference.get());
assertNotNull(reference.get());
@ -400,6 +398,104 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
public void testFetchShardsSkipUnavailable() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedNode);
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Collections.singletonList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
SearchRequest request = new SearchRequest("test-index");
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index")
.indicesOptions(request.indicesOptions()).local(true).preference(request.preference())
.routing(request.routing());
{
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch));
assertTrue(responseLatch.await(1, TimeUnit.SECONDS));
assertNull(failReference.get());
assertNotNull(reference.get());
ClusterSearchShardsResponse response = reference.get();
assertTrue(response != ClusterSearchShardsResponse.EMPTY);
assertEquals(knownNodes, Arrays.asList(response.getNodes()));
}
CountDownLatch disconnectedLatch = new CountDownLatch(1);
service.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (node.equals(seedNode)) {
disconnectedLatch.countDown();
}
}
});
service.addFailToSendNoConnectRule(seedTransport);
if (randomBoolean()) {
connection.updateSkipUnavailable(false);
}
{
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch));
assertTrue(responseLatch.await(1, TimeUnit.SECONDS));
assertNotNull(failReference.get());
assertNull(reference.get());
assertThat(failReference.get(), instanceOf(TransportException.class));
}
connection.updateSkipUnavailable(true);
{
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch));
assertTrue(responseLatch.await(1, TimeUnit.SECONDS));
assertNull(failReference.get());
assertNotNull(reference.get());
ClusterSearchShardsResponse response = reference.get();
assertTrue(response == ClusterSearchShardsResponse.EMPTY);
}
//give transport service enough time to realize that the node is down, and to notify the connection listeners
//so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next
assertTrue(disconnectedLatch.await(1, TimeUnit.SECONDS));
if (randomBoolean()) {
connection.updateSkipUnavailable(false);
}
service.clearAllRules();
//check that we reconnect once the node is back up
{
CountDownLatch responseLatch = new CountDownLatch(1);
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch));
assertTrue(responseLatch.await(1, TimeUnit.SECONDS));
assertNull(failReference.get());
assertNotNull(reference.get());
ClusterSearchShardsResponse response = reference.get();
assertTrue(response != ClusterSearchShardsResponse.EMPTY);
assertEquals(knownNodes, Arrays.asList(response.getNodes()));
}
}
}
}
}
public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
@ -621,53 +717,53 @@ public class RemoteClusterConnectionTests extends ESTestCase {
RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
4, 3, TimeValue.timeValueMinutes(30));
4, 3, TimeValue.timeValueMinutes(30), false);
assertSerialization(stats);
RemoteConnectionInfo stats1 = new RemoteConnectionInfo("test_cluster",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
4, 4, TimeValue.timeValueMinutes(30));
4, 4, TimeValue.timeValueMinutes(30), true);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
stats1 = new RemoteConnectionInfo("test_cluster_1",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
4, 3, TimeValue.timeValueMinutes(30));
4, 3, TimeValue.timeValueMinutes(30), false);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
stats1 = new RemoteConnectionInfo("test_cluster",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 15)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
4, 3, TimeValue.timeValueMinutes(30));
4, 3, TimeValue.timeValueMinutes(30), false);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
stats1 = new RemoteConnectionInfo("test_cluster",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 87)),
4, 3, TimeValue.timeValueMinutes(30));
4, 3, TimeValue.timeValueMinutes(30), true);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
stats1 = new RemoteConnectionInfo("test_cluster",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
4, 3, TimeValue.timeValueMinutes(325));
4, 3, TimeValue.timeValueMinutes(325), true);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
stats1 = new RemoteConnectionInfo("test_cluster",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
5, 3, TimeValue.timeValueMinutes(30));
5, 3, TimeValue.timeValueMinutes(30), false);
assertSerialization(stats1);
assertNotEquals(stats, stats1);
}
private RemoteConnectionInfo assertSerialization(RemoteConnectionInfo info) throws IOException {
private static RemoteConnectionInfo assertSerialization(RemoteConnectionInfo info) throws IOException {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(Version.CURRENT);
info.writeTo(out);
@ -680,31 +776,59 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
}
public void testRemoteConnectionInfoBwComp() throws IOException {
final Version version = VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, Version.V_6_0_0_rc2);
RemoteConnectionInfo expected = new RemoteConnectionInfo("test_cluster",
Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 80)),
4, 4, new TimeValue(30, TimeUnit.MINUTES), false);
String encoded = "AQQAAAAABzAuMC4wLjAAAAABAQQAAAAABzAuMC4wLjAAAABQBDwEBAx0ZXN0X2NsdXN0ZXIAAAAAAAAAAAAAAA==";
final byte[] data = Base64.getDecoder().decode(encoded);
try (StreamInput in = StreamInput.wrap(data)) {
in.setVersion(version);
RemoteConnectionInfo deserialized = new RemoteConnectionInfo(in);
assertEquals(expected, deserialized);
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
deserialized.writeTo(out);
try (StreamInput in2 = StreamInput.wrap(out.bytes().toBytesRef().bytes)) {
in2.setVersion(version);
RemoteConnectionInfo deserialized2 = new RemoteConnectionInfo(in2);
assertEquals(expected, deserialized2);
}
}
}
}
public void testRenderConnectionInfoXContent() throws IOException {
RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80)),
4, 3, TimeValue.timeValueMinutes(30));
4, 3, TimeValue.timeValueMinutes(30), true);
stats = assertSerialization(stats);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
stats.toXContent(builder, null);
builder.endObject();
assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"http_addresses\":[\"0.0.0.0:80\"],\"connected\":true," +
"\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"}}", builder.string());
"\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," +
"\"skip_unavailable\":true}}", builder.string());
stats = new RemoteConnectionInfo("some_other_cluster",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1), new TransportAddress(TransportAddress.META_ADDRESS,2)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80), new TransportAddress(TransportAddress.META_ADDRESS,81)),
2, 0, TimeValue.timeValueSeconds(30));
2, 0, TimeValue.timeValueSeconds(30), false);
stats = assertSerialization(stats);
builder = XContentFactory.jsonBuilder();
builder.startObject();
stats.toXContent(builder, null);
builder.endObject();
assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"],\"http_addresses\":[\"0.0.0.0:80\",\"0.0.0.0:81\"],"
+ "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"}}",
builder.string());
+ "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," +
"\"skip_unavailable\":false}}", builder.string());
}
private RemoteConnectionInfo getRemoteConnectionInfo(RemoteClusterConnection connection) throws Exception {
@ -950,11 +1074,10 @@ public class RemoteClusterConnectionTests extends ESTestCase {
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () ->
updateSeedNodes(connection, Arrays.asList(otherClusterTransport.getLocalDiscoNode())));
assertThat(illegalStateException.getMessage(),
Matchers.startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" +
startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" +
" - {other_cluster_discoverable_node}"));
}
}
}
}
}

View File

@ -21,7 +21,12 @@ package org.elasticsearch.transport;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@ -29,22 +34,29 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
public class RemoteClusterServiceTests extends ESTestCase {
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@ -70,6 +82,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
public void testSettingsAreRegistered() {
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE));
@ -147,7 +160,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> "cluster_1:bar".equals(i)));
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, "cluster_1:bar"::equals));
assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" +
" cluster_1", iae.getMessage());
@ -414,7 +427,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
failLatch.await();
assertNotNull(ex.get());
if (ex.get() instanceof IllegalStateException) {
assertThat(ex.get().getMessage(), Matchers.anyOf(Matchers.equalTo("no seed node left"), Matchers.startsWith
assertThat(ex.get().getMessage(), anyOf(equalTo("no seed node left"), startsWith
("No node available for cluster:")));
} else {
if (ex.get() instanceof TransportException == false) {
@ -429,4 +442,192 @@ public class RemoteClusterServiceTests extends ESTestCase {
}
}
}
public void testCollectSearchShards() throws Exception {
int numClusters = randomIntBetween(2, 10);
MockTransportService[] mockTransportServices = new MockTransportService[numClusters];
DiscoveryNode[] nodes = new DiscoveryNode[numClusters];
Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
Settings.Builder builder = Settings.builder();
for (int i = 0; i < numClusters; i++) {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes, Version.CURRENT);
mockTransportServices[i] = remoteSeedTransport;
DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode();
knownNodes.add(remoteSeedNode);
nodes[i] = remoteSeedNode;
builder.put("search.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString());
remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen()));
}
Settings settings = builder.build();
try {
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterService remoteClusterService = new RemoteClusterService(settings, service)) {
assertFalse(remoteClusterService.isCrossClusterSearchEnabled());
remoteClusterService.initializeRemoteClusters();
assertTrue(remoteClusterService.isCrossClusterSearchEnabled());
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertNull(failure.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias);
assertEquals(1, shardsResponse.getNodes().length);
}
}
int numDisconnectedClusters = randomIntBetween(1, numClusters);
Set<DiscoveryNode> disconnectedNodes = new HashSet<>(numDisconnectedClusters);
Set<Integer> disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters);
while(disconnectedNodes.size() < numDisconnectedClusters) {
int i = randomIntBetween(0, numClusters - 1);
if (disconnectedNodes.add(nodes[i])) {
assertTrue(disconnectedNodesIndices.add(i));
}
}
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
service.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
if (disconnectedNodes.remove(node)) {
disconnectedLatch.countDown();
}
}
});
for (DiscoveryNode disconnectedNode : disconnectedNodes) {
service.addFailToSendNoConnectRule(disconnectedNode.getAddress());
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertNull(response.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(TransportException.class));
assertThat(failure.get().getMessage(), containsString("unable to communicate with remote cluster"));
}
//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
for (int i : disconnectedNodesIndices) {
remoteClusterService.updateSkipUnavailable("remote" + i, true);
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertNull(failure.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias);
if (disconnectedNodesIndices.contains(i)) {
assertTrue(shardsResponse == ClusterSearchShardsResponse.EMPTY);
} else {
assertTrue(shardsResponse != ClusterSearchShardsResponse.EMPTY);
}
}
}
//give transport service enough time to realize that the node is down, and to notify the connection listeners
//so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next
assertTrue(disconnectedLatch.await(1, TimeUnit.SECONDS));
service.clearAllRules();
if (randomBoolean()) {
for (int i : disconnectedNodesIndices) {
if (randomBoolean()) {
remoteClusterService.updateSkipUnavailable("remote" + i, true);
}
}
}
{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertNull(failure.get());
assertNotNull(response.get());
Map<String, ClusterSearchShardsResponse> map = response.get();
assertEquals(numClusters, map.size());
for (int i = 0; i < numClusters; i++) {
String clusterAlias = "remote" + i;
assertTrue(map.containsKey(clusterAlias));
ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias);
assertTrue(shardsResponse != ClusterSearchShardsResponse.EMPTY);
}
}
}
}
} finally {
for (MockTransportService mockTransportService : mockTransportServices) {
mockTransportService.close();
}
}
}
public void testRemoteClusterSkipIfDisconnectedSetting() {
{
Settings settings = Settings.builder()
.put("search.remote.foo.skip_unavailable", true)
.put("search.remote.bar.skip_unavailable", false).build();
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
}
{
Settings brokenSettings = Settings.builder()
.put("search.remote.foo.skip_unavailable", "broken").build();
expectThrows(IllegalArgumentException.class, () ->
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(brokenSettings)
.forEach(setting -> setting.get(brokenSettings)));
}
AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY,
new HashSet<>(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
{
Settings settings = Settings.builder().put("search.remote.foo.skip_unavailable", randomBoolean()).build();
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(settings, true));
assertEquals("Missing required setting [search.remote.foo.seeds] for setting [search.remote.foo.skip_unavailable]",
iae.getMessage());
}
{
try (MockTransportService remoteSeedTransport = startTransport("seed", new CopyOnWriteArrayList<>(), Version.CURRENT)) {
String seed = remoteSeedTransport.getLocalDiscoNode().getAddress().toString();
service.validate(Settings.builder().put("search.remote.foo.skip_unavailable", randomBoolean())
.put("search.remote.foo.seeds", seed).build(), true);
service.validate(Settings.builder().put("search.remote.foo.seeds", seed).build(), true);
AbstractScopedSettings service2 = new ClusterSettings(Settings.builder().put("search.remote.foo.seeds", seed).build(),
new HashSet<>(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)));
service2.validate(Settings.builder().put("search.remote.foo.skip_unavailable", randomBoolean()).build(), false);
}
}
}
}

View File

@ -110,9 +110,10 @@ buildRestTests.setups['host'] = '''
- set: { master_node: master }
- do:
nodes.info:
metric: [ http ]
metric: [ http, transport ]
- is_true: nodes.$master.http.publish_address
- set: {nodes.$master.http.publish_address: host}
- set: {nodes.$master.transport.publish_address: transport_host}
'''
buildRestTests.setups['node'] = '''

View File

@ -33,3 +33,7 @@ the configured remote cluster alias.
`initial_connect_timeout`::
The initial connect timeout for remote cluster connections.
`skip_unavailable`::
Whether the remote cluster is skipped in case it is searched through
a cross cluster search request but none of its nodes are available.

View File

@ -69,6 +69,11 @@ PUT _cluster/settings
"seeds": [
"127.0.0.1:9301"
]
},
"cluster_three": {
"seeds": [
"127.0.0.1:9302"
]
}
}
}
@ -76,6 +81,46 @@ PUT _cluster/settings
}
--------------------------------
// CONSOLE
// TEST[setup:host]
// TEST[s/127.0.0.1:9300/\${transport_host}/]
//////////////////////////
We want to be sure that settings have been updated,
because we'll use them later.
[source,js]
--------------------------------------------------
{
"acknowledged" : true,
"persistent": {
"search": {
"remote": {
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
]
},
"cluster_two": {
"seeds": [
"127.0.0.1:9301"
]
},
"cluster_three": {
"seeds": [
"127.0.0.1:9302"
]
}
}
}
},
"transient" : {}
}
--------------------------------------------------
// TESTRESPONSE[s/127.0.0.1:9300/\${transport_host}/]
//////////////////////////
A remote cluster can be deleted from the cluster settings by setting its seeds to `null`:
@ -86,7 +131,7 @@ PUT _cluster/settings
"persistent": {
"search": {
"remote": {
"cluster_one": {
"cluster_three": {
"seeds": null <1>
}
}
@ -95,41 +140,104 @@ PUT _cluster/settings
}
--------------------------------
// CONSOLE
<1> `cluster_one` would be removed from the cluster settings, leaving `cluster_two` intact.
// TEST[continued]
<1> `cluster_three` would be removed from the cluster settings, leaving `cluster_one` and `cluster_two` intact.
//////////////////////////
We want to be sure that settings have been updated,
because we'll use them later.
[source,js]
--------------------------------------------------
{
"acknowledged" : true,
"persistent" : {},
"transient" : {}
}
--------------------------------------------------
// TESTRESPONSE
//////////////////////////
[float]
=== Using cross cluster search
To search the `twitter` index on remote cluster `cluster_1` the index name must be prefixed with the cluster alias
separated by a `:` character:
To search the `twitter` index on remote cluster `cluster_one` the index name
must be prefixed with the cluster alias separated by a `:` character:
[source,js]
--------------------------------------------------
POST /cluster_one:twitter/tweet/_search
GET /cluster_one:twitter/tweet/_search
{
"query": {
"match_all": {}
"match": {
"user": "kimchy"
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[skip:we don't have two clusters set up during docs testing]
// TEST[continued]
// TEST[setup:twitter]
[source,js]
--------------------------------------------------
{
"took": 150,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"failed": 0,
"skipped": 0
},
"_clusters": {
"total": 1,
"successful": 1,
"skipped": 0
},
"hits": {
"total": 1,
"max_score": 1,
"hits": [
{
"_index": "cluster_one:twitter",
"_type": "tweet",
"_id": "0",
"_score": 1,
"_source": {
"user": "kimchy",
"date": "2009-11-15T14:12:12",
"message": "trying out Elasticsearch",
"likes": 0
}
}
]
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 150/"took": "$body.took"/]
// TESTRESPONSE[s/"max_score": 1/"max_score": "$body.hits.max_score"/]
// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]
In contrast to the `tribe` feature cross cluster search can also search indices with the same name on different
clusters:
[source,js]
--------------------------------------------------
POST /cluster_one:twitter,twitter/tweet/_search
GET /cluster_one:twitter,twitter/tweet/_search
{
"query": {
"match_all": {}
"match": {
"user": "kimchy"
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[skip:we don't have two clusters set up during docs testing]
// TEST[continued]
Search results are disambiguated the same way as the indices are disambiguated in the request. Even if index names are
identical these indices will be treated as different indices when results are merged. All results retrieved from a
@ -139,44 +247,146 @@ will be prefixed with their remote cluster name:
[source,js]
--------------------------------------------------
{
"took" : 89,
"timed_out" : false,
"_shards" : {
"total" : 10,
"successful" : 10,
"failed" : 0
"took": 150,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"failed": 0,
"skipped": 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
"_clusters": {
"total": 2,
"successful": 2,
"skipped": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index" : "cluster_one:twitter",
"_type" : "tweet",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"user" : "kimchy",
"postDate" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
"_index": "cluster_one:twitter",
"_type": "tweet",
"_id": "0",
"_score": 1,
"_source": {
"user": "kimchy",
"date": "2009-11-15T14:12:12",
"message": "trying out Elasticsearch",
"likes": 0
}
},
{
"_index" : "twitter",
"_type" : "tweet",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"user" : "kimchy",
"postDate" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
"_index": "twitter",
"_type": "tweet",
"_id": "0",
"_score": 2,
"_source": {
"user": "kimchy",
"date": "2009-11-15T14:12:12",
"message": "trying out Elasticsearch",
"likes": 0
}
}
]
}
}
--------------------------------------------------
// TESTRESPONSE
// TESTRESPONSE[s/"took": 150/"took": "$body.took"/]
// TESTRESPONSE[s/"max_score": 1/"max_score": "$body.hits.max_score"/]
// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]
// TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/]
[float]
=== Skipping disconnected clusters
By default all remote clusters that are searched via Cross Cluster Search need to be available when
the search request is executed, otherwise the whole request fails and no search results are returned
despite some of the clusters are available. Remote clusters can be made optional through the
boolean `skip_unavailable` setting, set to `false` by default.
[source,js]
--------------------------------
PUT _cluster/settings
{
"persistent": {
"search.remote.cluster_two.skip_unavailable": true <1>
}
}
--------------------------------
// CONSOLE
// TEST[continued]
<1> `cluster_two` is made optional
[source,js]
--------------------------------------------------
GET /cluster_one:twitter,cluster_two:twitter,twitter/tweet/_search <1>
{
"query": {
"match": {
"user": "kimchy"
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[continued]
<1> Search against the `twitter` index in `cluster_one`, `cluster_two` and also locally
[source,js]
--------------------------------------------------
{
"took": 150,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"failed": 0,
"skipped": 0
},
"_clusters": { <1>
"total": 3,
"successful": 2,
"skipped": 1
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "cluster_one:twitter",
"_type": "tweet",
"_id": "0",
"_score": 1,
"_source": {
"user": "kimchy",
"date": "2009-11-15T14:12:12",
"message": "trying out Elasticsearch",
"likes": 0
}
},
{
"_index": "twitter",
"_type": "tweet",
"_id": "0",
"_score": 2,
"_source": {
"user": "kimchy",
"date": "2009-11-15T14:12:12",
"message": "trying out Elasticsearch",
"likes": 0
}
}
]
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 150/"took": "$body.took"/]
// TESTRESPONSE[s/"max_score": 1/"max_score": "$body.hits.max_score"/]
// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]
// TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/]
<1> The `clusters` section indicates that one cluster was unavailable and got skipped
[float]
[[cross-cluster-search-settings]]
@ -205,6 +415,14 @@ will be prefixed with their remote cluster name:
remote clusters. Cross-cluster search requests must be sent to a node that
is allowed to act as a cross-cluster client.
`search.remote.${cluster_alias}.skip_unavailable`::
Per cluster boolean setting that allows to skip specific clusters when no
nodes belonging to them are available and they are searched as part of a
cross cluster search request. Default is `false`, meaning that all clusters
are mandatory by default, but they can selectively be made optional by
setting this setting to `true`.
[float]
[[retrieve-remote-clusters-info]]
=== Retrieving remote clusters info

View File

@ -455,7 +455,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
SearchHit hit = new SearchHit(0, "id", new Text("type"), emptyMap());
SearchHits hits = new SearchHits(new SearchHit[] { hit }, 0, 0);
InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false, 1);
SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, 0, randomLong(), null);
SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, 0, randomLong(), null,
SearchResponse.Clusters.EMPTY);
if (randomBoolean()) {
client.lastScroll.get().listener.onResponse(searchResponse);

View File

@ -0,0 +1,25 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
apply plugin: 'elasticsearch.test-with-dependencies'
dependencies {
testCompile project(path: ':client:rest-high-level', configuration: 'runtime')
}

View File

@ -0,0 +1,319 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.AfterClass;
import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.containsString;
public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
private static RestHighLevelClient restHighLevelClient;
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
@Before
public void initHighLevelClient() throws IOException {
super.initClient();
if (restHighLevelClient == null) {
restHighLevelClient = new HighLevelClient(client());
}
}
@AfterClass
public static void cleanupClient() throws IOException {
restHighLevelClient.close();
restHighLevelClient = null;
}
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
private static MockTransportService startTransport(
final String id,
final List<DiscoveryNode> knownNodes,
final Version version,
final ThreadPool threadPool) {
boolean success = false;
final Settings s = Settings.builder().put("node.name", id).build();
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(s);
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null);
try {
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
(request, channel) -> {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : knownNodes) {
builder.add(node);
}
ClusterState build = ClusterState.builder(clusterName).nodes(builder.build()).build();
channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L));
});
newService.start();
newService.acceptIncomingRequests();
success = true;
return newService;
} finally {
if (success == false) {
newService.close();
}
}
}
public void testSearchSkipUnavailable() throws IOException {
try (MockTransportService remoteTransport = startTransport("node0", new CopyOnWriteArrayList<>(), Version.CURRENT, threadPool)) {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
updateRemoteClusterSettings(Collections.singletonMap("seeds", remoteNode.getAddress().toString()));
for (int i = 0; i < 10; i++) {
restHighLevelClient.index(new IndexRequest("index", "doc", String.valueOf(i)).source("field", "value"));
}
Response refreshResponse = client().performRequest("POST", "/index/_refresh");
assertEquals(200, refreshResponse.getStatusLine().getStatusCode());
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("index"));
assertSame(SearchResponse.Clusters.EMPTY, response.getClusters());
assertEquals(10, response.getHits().totalHits);
assertEquals(10, response.getHits().getHits().length);
}
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("index", "remote1:index"));
assertEquals(2, response.getClusters().getTotal());
assertEquals(2, response.getClusters().getSuccessful());
assertEquals(0, response.getClusters().getSkipped());
assertEquals(10, response.getHits().totalHits);
assertEquals(10, response.getHits().getHits().length);
}
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("remote1:index"));
assertEquals(1, response.getClusters().getTotal());
assertEquals(1, response.getClusters().getSuccessful());
assertEquals(0, response.getClusters().getSkipped());
assertEquals(0, response.getHits().totalHits);
}
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("index", "remote1:index").scroll("1m"));
assertEquals(2, response.getClusters().getTotal());
assertEquals(2, response.getClusters().getSuccessful());
assertEquals(0, response.getClusters().getSkipped());
assertEquals(10, response.getHits().totalHits);
assertEquals(10, response.getHits().getHits().length);
String scrollId = response.getScrollId();
SearchResponse scrollResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(scrollId));
assertSame(SearchResponse.Clusters.EMPTY, scrollResponse.getClusters());
assertEquals(10, scrollResponse.getHits().totalHits);
assertEquals(0, scrollResponse.getHits().getHits().length);
}
remoteTransport.close();
updateRemoteClusterSettings(Collections.singletonMap("skip_unavailable", true));
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("index", "remote1:index"));
assertEquals(2, response.getClusters().getTotal());
assertEquals(1, response.getClusters().getSuccessful());
assertEquals(1, response.getClusters().getSkipped());
assertEquals(10, response.getHits().totalHits);
assertEquals(10, response.getHits().getHits().length);
}
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("remote1:index"));
assertEquals(1, response.getClusters().getTotal());
assertEquals(0, response.getClusters().getSuccessful());
assertEquals(1, response.getClusters().getSkipped());
assertEquals(0, response.getHits().totalHits);
}
{
SearchResponse response = restHighLevelClient.search(new SearchRequest("index", "remote1:index").scroll("1m"));
assertEquals(2, response.getClusters().getTotal());
assertEquals(1, response.getClusters().getSuccessful());
assertEquals(1, response.getClusters().getSkipped());
assertEquals(10, response.getHits().totalHits);
assertEquals(10, response.getHits().getHits().length);
String scrollId = response.getScrollId();
SearchResponse scrollResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(scrollId));
assertSame(SearchResponse.Clusters.EMPTY, scrollResponse.getClusters());
assertEquals(10, scrollResponse.getHits().totalHits);
assertEquals(0, scrollResponse.getHits().getHits().length);
}
updateRemoteClusterSettings(Collections.singletonMap("skip_unavailable", false));
assertSearchConnectFailure();
Map<String, Object> map = new HashMap<>();
map.put("seeds", null);
map.put("skip_unavailable", null);
updateRemoteClusterSettings(map);
}
}
public void testSkipUnavailableDependsOnSeeds() throws IOException {
try (MockTransportService remoteTransport = startTransport("node0", new CopyOnWriteArrayList<>(), Version.CURRENT, threadPool)) {
DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode();
{
//check that skip_unavailable alone cannot be set
HttpEntity clusterSettingsEntity = buildUpdateSettingsRequestBody(
Collections.singletonMap("skip_unavailable", randomBoolean()));
ResponseException responseException = expectThrows(ResponseException.class,
() -> client().performRequest("PUT", "/_cluster/settings", Collections.emptyMap(), clusterSettingsEntity));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(),
containsString("Missing required setting [search.remote.remote1.seeds] " +
"for setting [search.remote.remote1.skip_unavailable]"));
}
Map<String, Object> settingsMap = new HashMap<>();
settingsMap.put("seeds", remoteNode.getAddress().toString());
settingsMap.put("skip_unavailable", randomBoolean());
updateRemoteClusterSettings(settingsMap);
{
//check that seeds cannot be reset alone if skip_unavailable is set
HttpEntity clusterSettingsEntity = buildUpdateSettingsRequestBody(Collections.singletonMap("seeds", null));
ResponseException responseException = expectThrows(ResponseException.class,
() -> client().performRequest("PUT", "/_cluster/settings", Collections.emptyMap(), clusterSettingsEntity));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("Missing required setting [search.remote.remote1.seeds] " +
"for setting [search.remote.remote1.skip_unavailable]"));
}
if (randomBoolean()) {
updateRemoteClusterSettings(Collections.singletonMap("skip_unavailable", null));
updateRemoteClusterSettings(Collections.singletonMap("seeds", null));
} else {
Map<String, Object> nullMap = new HashMap<>();
nullMap.put("seeds", null);
nullMap.put("skip_unavailable", null);
updateRemoteClusterSettings(nullMap);
}
}
}
private static void assertSearchConnectFailure() {
{
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> restHighLevelClient.search(new SearchRequest("index", "remote1:index")));
ElasticsearchException rootCause = (ElasticsearchException)exception.getRootCause();
assertThat(rootCause.getMessage(), containsString("connect_exception"));
}
{
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> restHighLevelClient.search(new SearchRequest("remote1:index")));
ElasticsearchException rootCause = (ElasticsearchException)exception.getRootCause();
assertThat(rootCause.getMessage(), containsString("connect_exception"));
}
{
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> restHighLevelClient.search(new SearchRequest("remote1:index").scroll("1m")));
ElasticsearchException rootCause = (ElasticsearchException)exception.getRootCause();
assertThat(rootCause.getMessage(), containsString("connect_exception"));
}
}
private static void updateRemoteClusterSettings(Map<String, Object> settings) throws IOException {
HttpEntity clusterSettingsEntity = buildUpdateSettingsRequestBody(settings);
Response response = client().performRequest("PUT", "/_cluster/settings", Collections.emptyMap(), clusterSettingsEntity);
assertEquals(200, response.getStatusLine().getStatusCode());
}
private static HttpEntity buildUpdateSettingsRequestBody(Map<String, Object> settings) throws IOException {
String requestBody;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.startObject("persistent");
{
builder.startObject("search.remote.remote1");
{
for (Map.Entry<String, Object> entry : settings.entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
requestBody = builder.string();
}
return new NStringEntity(requestBody, ContentType.APPLICATION_JSON);
}
private static class HighLevelClient extends RestHighLevelClient {
private HighLevelClient(RestClient restClient) {
super(restClient, (client) -> {}, Collections.emptyList());
}
}
}

View File

@ -56,3 +56,52 @@
- match: { my_remote_cluster.initial_connect_timeout: "30s" }
- match: { test_remote_cluster.initial_connect_timeout: "30s" }
---
"skip_unavailable is returned as part of _remote/info response":
- skip:
#TODO update versions once backported
version: " - 7.0.0"
reason: "skip_unavailable is only returned from 7.0.0 on"
- do:
remote.info: {}
- is_false: my_remote_cluster.skip_unavailable
- do:
cluster.put_settings:
body:
transient:
search.remote.my_remote_cluster.skip_unavailable: true
- is_true: transient.search.remote.my_remote_cluster.skip_unavailable
- do:
remote.info: {}
- is_true: my_remote_cluster.skip_unavailable
- do:
cluster.put_settings:
body:
transient:
search.remote.my_remote_cluster.skip_unavailable: false
- is_false: transient.search.remote.my_remote_cluster.skip_unavailable
- do:
remote.info: {}
- is_false: my_remote_cluster.skip_unavailable
- do:
cluster.put_settings:
body:
transient:
search.remote.my_remote_cluster.skip_unavailable: null
- match: {transient: {}}
- do:
remote.info: {}
- is_false: my_remote_cluster.skip_unavailable

View File

@ -11,6 +11,9 @@
query:
match_all: {}
- match: {_clusters.total: 1}
- match: {_clusters.successful: 1}
- match: {_clusters.skipped: 0}
- set: {_scroll_id: scroll_id}
- match: {hits.total: 6 }
- length: {hits.hits: 4 }
@ -23,6 +26,7 @@
scroll:
body: { "scroll_id": "$scroll_id", "scroll": "1m"}
- is_false: _clusters
- match: {hits.total: 6 }
- length: {hits.hits: 2 }
- match: {hits.hits.0._source.filter_field: 1 }

View File

@ -28,6 +28,7 @@
body: "{ \"query\": { \"match_all\": {} } }"
- gte: { took: 0 }
- is_false: _clusters
- is_true: _shards.total
- is_true: hits.total
- is_true: hits.hits.0._index

View File

@ -66,6 +66,7 @@ List projects = [
'plugins:jvm-example',
'plugins:store-smb',
'qa:auto-create-index',
'qa:ccs-unavailable-clusters',
'qa:evil-tests',
'qa:full-cluster-restart',
'qa:integration-bwc',