add support multi cluster search

Search api looks at indices that the request relates to, if they contain a certain separator ("|" at the moment, to be better defined in the future) the index name is split into two part where the first portion is the name of a remote cluster and the second part is the name of the index expression. Remote clusters are defined as dynamic cluster settings. There are some TODOs and open question but the main functionality works.
This commit is contained in:
javanna 2016-11-21 17:03:49 +01:00 committed by Luca Cavanna
parent ac2aa56350
commit b440ea946f
5 changed files with 294 additions and 16 deletions

View File

@ -19,16 +19,25 @@
package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
@ -44,13 +53,23 @@ import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
/**
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
@ -71,11 +90,147 @@ public class SearchTransportService extends AbstractComponent {
public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]";
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
private final TransportService transportService;
//TODO what should the setting name be?
public static final Setting<Settings> REMOTE_CLUSTERS_SEEDS = Setting.groupSetting("action.search.remote.",
SearchTransportService::validateRemoteClustersSeeds,
Setting.Property.NodeScope,
Setting.Property.Dynamic);
public SearchTransportService(Settings settings, TransportService transportService) {
private final TransportService transportService;
private volatile Map<String, List<DiscoveryNode>> remoteClustersSeeds;
public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) {
super(settings);
this.transportService = transportService;
setRemoteClustersSeeds(REMOTE_CLUSTERS_SEEDS.get(settings));
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTERS_SEEDS, this::setRemoteClustersSeeds,
SearchTransportService::validateRemoteClustersSeeds);
}
private static void validateRemoteClustersSeeds(Settings settings) {
//TODO add a static whitelist like in reindex from remote
for (String clusterName : settings.names()) {
String[] remoteHosts = settings.getAsArray(clusterName);
if (remoteHosts.length == 0) {
throw new IllegalArgumentException("no hosts set for remote cluster [" + clusterName + "], at least one host is required");
}
for (String remoteHost : remoteHosts) {
String[] strings = remoteHost.split(":");
if (strings.length != 2) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] " +
"instead for remote cluster [" + clusterName + "]");
}
try {
Integer.valueOf(strings[1]);
} catch(NumberFormatException e) {
throw new IllegalArgumentException("port must be a number, found [" + strings[1] + "] instead for remote cluster [" +
clusterName + "]");
}
}
}
}
private void setRemoteClustersSeeds(Settings settings) {
Map<String, List<DiscoveryNode>> remoteClustersNodes = new HashMap<>();
for (String clusterName : settings.names()) {
String[] remoteHosts = settings.getAsArray(clusterName);
for (String remoteHost : remoteHosts) {
String[] strings = remoteHost.split(":");
String host = strings[0];
int port = Integer.valueOf(strings[1]);
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + remoteHost,
new TransportAddress(new InetSocketAddress(host, port)), Version.CURRENT.minimumCompatibilityVersion());
//don't connect yet as that would require the remote node to be up and would fail the local node startup otherwise
List<DiscoveryNode> nodes = remoteClustersNodes.get(clusterName);
if (nodes == null) {
nodes = new ArrayList<>();
remoteClustersNodes.put(clusterName, nodes);
}
nodes.add(node);
}
}
remoteClustersSeeds = remoteClustersNodes;
}
private DiscoveryNode connectToRemoteCluster(String clusterName) {
List<DiscoveryNode> nodes = remoteClustersSeeds.get(clusterName);
if (nodes == null) {
throw new IllegalArgumentException("no remote cluster configured with name [" + clusterName + "]");
}
DiscoveryNode node = nodes.get(Randomness.get().nextInt(nodes.size()));
//TODO we just take a random host for now, implement fallback in case of connect failure
try {
//TODO we should call liveness api and get back an updated discovery node. that would have the updated version
// and would make the search shards call more future-proof. Also validating the cluster name may be a thing.
connectToRemoteNode(node);
} catch(ConnectTransportException e) {
throw new ConnectTransportException(node, "unable to connect to remote cluster [" + clusterName + "]", e);
}
return node;
}
void connectToRemoteNode(DiscoveryNode remoteNode) {
//TODO should the list of seeds get updated based on nodes that we get back from the remote cluster through search_shards?
transportService.connectToNode(remoteNode);
//TODO is it ok to connect and leave the node connected? It will be pinged from now on?
}
void sendSearchShards(SearchRequest searchRequest, Map<String, List<String>> remoteIndicesByCluster,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<TransportException> transportException = new AtomicReference<>();
for (Map.Entry<String, List<String>> entry : remoteIndicesByCluster.entrySet()) {
final String clusterName = entry.getKey();
final DiscoveryNode node = connectToRemoteCluster(clusterName);
final List<String> indices = entry.getValue();
//local true so we don't go to the master for each single remote search
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices.toArray(new String[indices.size()]))
.indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference())
.routing(searchRequest.routing());
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
new TransportResponseHandler<ClusterSearchShardsResponse>() {
@Override
public ClusterSearchShardsResponse newInstance() {
return new ClusterSearchShardsResponse();
}
@Override
public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
if (responsesCountDown.countDown()) {
TransportException exception = transportException.get();
if (exception == null) {
listener.onResponse(searchShardsResponses);
} else {
listener.onFailure(transportException.get());
}
}
}
@Override
public void handleException(TransportException e) {
TransportException exception = new TransportException("unable to communicate with remote cluster [" +
clusterName + "]", e);
if (transportException.compareAndSet(null, exception) == false) {
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
return current;
});
}
if (responsesCountDown.countDown()) {
listener.onFailure(exception);
}
}
@Override
public String executor() {
return ThreadPool.Names.SEARCH;
}
});
}
}
public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) {

View File

@ -20,27 +20,39 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
@ -55,6 +67,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
public static final Setting<Long> SHARD_COUNT_LIMIT_SETTING = Setting.longSetting(
"action.search.shard_count.limit", 1000L, 1L, Property.Dynamic, Property.NodeScope);
private static final char REMOTE_CLUSTER_INDEX_SEPARATOR = '|';
private final ClusterService clusterService;
private final SearchTransportService searchTransportService;
private final SearchPhaseController searchPhaseController;
@ -73,7 +87,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
this.searchService = searchService;
}
private Map<String, AliasFilter> buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState, Index[] concreteIndices) {
private Map<String, AliasFilter> buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState,
Index[] concreteIndices, String[] remoteUUIDs) {
final Map<String, AliasFilter> aliasFilterMap = new HashMap<>();
for (Index index : concreteIndices) {
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index.getName());
@ -81,6 +96,11 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
assert aliasFilter != null;
aliasFilterMap.put(index.getUUID(), aliasFilter);
}
//TODO this is just a temporary workaround, alias filters need to be retrieved, at the moment they are ignored for remote indices
//they will be retrieved from search_shards from 5.1 on.
for (String remoteUUID : remoteUUIDs) {
aliasFilterMap.put(remoteUUID, new AliasFilter(null, Strings.EMPTY_ARRAY));
}
return aliasFilterMap;
}
@ -88,23 +108,92 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// pure paranoia if time goes backwards we are at least positive
final long startTimeInMillis = Math.max(0, System.currentTimeMillis());
//TODO make selection smarter: aliases can still contain any character and remote indices should have the precedence all the time.
//e.g. we could skip the remote logic if no remote clusters are registered. Also don't go remotely if the prefix is not
//a registered cluster rather than throwing an error?
final List<String> localIndicesList = new ArrayList<>();
final Map<String, List<String>> remoteIndicesByCluster = new HashMap<>();
for (String index : searchRequest.indices()) {
int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR);
if (i >= 0) {
String remoteCluster = index.substring(0, i);
String remoteIndex = index.substring(i + 1);
List<String> indices = remoteIndicesByCluster.get(remoteCluster);
if (indices == null) {
indices = new ArrayList<>();
remoteIndicesByCluster.put(remoteCluster, indices);
}
indices.add(remoteIndex);
} else {
localIndicesList.add(index);
}
}
String[] localIndices = localIndicesList.toArray(new String[localIndicesList.size()]);
if (remoteIndicesByCluster.isEmpty()) {
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices,
Strings.EMPTY_ARRAY, Collections.emptyList(), Collections.emptySet(), listener);
} else {
searchTransportService.sendSearchShards(searchRequest, remoteIndicesByCluster,
new ActionListener<Map<String, ClusterSearchShardsResponse>>() {
@Override
public void onResponse(Map<String, ClusterSearchShardsResponse> searchShardsResponses) {
List<ShardIterator> remoteShardIterators = new ArrayList<>();
Set<DiscoveryNode> remoteNodes = new HashSet<>();
Set<String> remoteUUIDs = new HashSet<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
String clusterName = entry.getKey();
ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
Collections.addAll(remoteNodes, searchShardsResponse.getNodes());
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
//add the cluster name to the remote index names for indices disambiguation
//this ends up in the hits returned with the search response
Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR +
clusterSearchShardsGroup.getShardId().getIndex().getName(),
clusterSearchShardsGroup.getShardId().getIndex().getUUID());
ShardId shardId = new ShardId(index, clusterSearchShardsGroup.getShardId().getId());
ShardIterator shardIterator = new PlainShardIterator(shardId,
Arrays.asList(clusterSearchShardsGroup.getShards()));
remoteShardIterators.add(shardIterator);
remoteUUIDs.add(clusterSearchShardsGroup.getShardId().getIndex().getUUID());
}
}
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices,
remoteUUIDs.toArray(new String[remoteUUIDs.size()]), remoteShardIterators, remoteNodes, listener);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
private void executeSearch(SearchTask task, long startTimeInMillis, SearchRequest searchRequest, String[] localIndices,
String[] remoteUUIDs, List<ShardIterator> remoteShardIterators, Set<DiscoveryNode> remoteNodes,
ActionListener<SearchResponse> listener) {
ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
Index[] indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
startTimeInMillis, searchRequest.indices());
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices);
startTimeInMillis, localIndices);
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteUUIDs);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
String[] concreteIndices = new String[indices.length];
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
}
GroupShardsIterator shardIterators = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
searchRequest.preference());
GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, remoteShardIterators);
failIfOverShardCountLimit(clusterService, shardIterators.size());
// optimize search type for cases where there is only one shard group to search on
@ -124,8 +213,38 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
}
searchAsyncAction((SearchTask)task, searchRequest, shardIterators, startTimeInMillis, clusterState,
Collections.unmodifiableMap(aliasFilter), listener).start();
Function<String, DiscoveryNode> nodesLookup = mergeNodesLookup(clusterState.nodes(), remoteNodes);
searchAsyncAction(task, searchRequest, shardIterators, startTimeInMillis, nodesLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), listener).start();
}
private static GroupShardsIterator mergeShardsIterators(GroupShardsIterator localShardsIterator,
List<ShardIterator> remoteShardIterators) {
if (remoteShardIterators.isEmpty()) {
return localShardsIterator;
}
List<ShardIterator> shards = new ArrayList<>();
for (ShardIterator shardIterator : remoteShardIterators) {
shards.add(shardIterator);
}
for (ShardIterator shardIterator : localShardsIterator) {
shards.add(shardIterator);
}
return new GroupShardsIterator(shards);
}
private Function<String, DiscoveryNode> mergeNodesLookup(DiscoveryNodes nodes, Set<DiscoveryNode> remoteNodes) {
if (remoteNodes.isEmpty()) {
return nodes::get;
}
ImmutableOpenMap.Builder<String, DiscoveryNode> builder = ImmutableOpenMap.builder(nodes.getNodes());
for (DiscoveryNode remoteNode : remoteNodes) {
//TODO shall we catch connect exceptions here? Otherwise we will return an error but we could rather return partial results?
searchTransportService.connectToRemoteNode(remoteNode);
builder.put(remoteNode.getId(), remoteNode);
}
return builder.build()::get;
}
@Override
@ -134,10 +253,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest, GroupShardsIterator shardIterators,
long startTime, ClusterState state, Map<String, AliasFilter> aliasFilter,
long startTime, Function<String, DiscoveryNode> nodesLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
ActionListener<SearchResponse> listener) {
final Function<String, DiscoveryNode> nodesLookup = state.nodes()::get;
final long clusterStateVersion = state.version();
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
AbstractSearchAsyncAction searchAsyncAction;
switch(searchRequest.searchType()) {
@ -167,7 +285,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
return searchAsyncAction;
}
private void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
final long shardCountLimit = clusterService.getClusterSettings().get(SHARD_COUNT_LIMIT_SETTING);
if (shardCount > shardCountLimit) {
throw new IllegalArgumentException("Trying to query " + shardCount + " shards, which is over the limit of "

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.settings;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
@ -54,9 +55,9 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayService;
@ -255,6 +256,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
SearchTransportService.REMOTE_CLUSTERS_SEEDS,
TransportService.TRACE_LOG_EXCLUDE_SETTING,
TransportService.TRACE_LOG_INCLUDE_SETTING,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,

View File

@ -426,7 +426,8 @@ public class Node implements Closeable {
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase()));
b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings, transportService));
b.bind(SearchTransportService.class).toInstance(new SearchTransportService(settings,
settingsModule.getClusterSettings(), transportService));
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
scriptModule.getScriptService()));
b.bind(Transport.class).toInstance(transport);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
@ -76,7 +77,8 @@ public class SearchAsyncActionTests extends ESTestCase {
AtomicInteger contextIdGenerator = new AtomicInteger(0);
GroupShardsIterator shardsIter = getShardsIter("idx", randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode);
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null) {
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(SearchTransportService.REMOTE_CLUSTERS_SEEDS)), null) {
@Override
public void sendFreeContext(DiscoveryNode node, long contextId, SearchRequest request) {
numFreedContext.incrementAndGet();