Add cross cluster support to `_field_caps` (#24463)

To support kibana this commit adds an internal optimization
to support the cross cluster syntax for indices on the `_field_caps`
API.

Closes #24334
This commit is contained in:
Simon Willnauer 2017-05-04 11:44:54 +02:00 committed by GitHub
parent 9a3ab3e800
commit 14e57bf9f8
13 changed files with 347 additions and 124 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.fieldcaps;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.Map;
@ -29,7 +30,7 @@ import java.util.Map;
/**
* Response for {@link FieldCapabilitiesIndexRequest} requests.
*/
public class FieldCapabilitiesIndexResponse extends ActionResponse {
public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable {
private String indexName;
private Map<String, FieldCapabilities> responseMap;
@ -41,6 +42,10 @@ public class FieldCapabilitiesIndexResponse extends ActionResponse {
FieldCapabilitiesIndexResponse() {
}
FieldCapabilitiesIndexResponse(StreamInput input) throws IOException {
this.readFrom(input);
}
/**
* Get the index name

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.fieldcaps;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
@ -38,13 +39,14 @@ import java.util.Set;
import static org.elasticsearch.common.xcontent.ObjectParser.fromList;
public class FieldCapabilitiesRequest extends ActionRequest
implements IndicesRequest.Replaceable {
public final class FieldCapabilitiesRequest extends ActionRequest implements IndicesRequest.Replaceable {
public static final ParseField FIELDS_FIELD = new ParseField("fields");
public static final String NAME = "field_caps_request";
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
private String[] fields = Strings.EMPTY_ARRAY;
// pkg private API mainly for cross cluster search to signal that we do multiple reductions ie. the results should not be merged
private boolean mergeResults = true;
private static ObjectParser<FieldCapabilitiesRequest, Void> PARSER =
new ObjectParser<>(NAME, FieldCapabilitiesRequest::new);
@ -56,16 +58,39 @@ public class FieldCapabilitiesRequest extends ActionRequest
public FieldCapabilitiesRequest() {}
/**
* Returns <code>true</code> iff the results should be merged.
*/
boolean isMergeResults() {
return mergeResults;
}
/**
* if set to <code>true</code> the response will contain only a merged view of the per index field capabilities. Otherwise only
* unmerged per index field capabilities are returned.
*/
void setMergeResults(boolean mergeResults) {
this.mergeResults = mergeResults;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
fields = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
mergeResults = in.readBoolean();
} else {
mergeResults = true;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(fields);
if (out.getVersion().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
out.writeBoolean(mergeResults);
}
}
public static FieldCapabilitiesRequest parseFields(XContentParser parser) throws IOException {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.fieldcaps;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -27,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
@ -34,9 +36,20 @@ import java.util.Map;
*/
public class FieldCapabilitiesResponse extends ActionResponse implements ToXContent {
private Map<String, Map<String, FieldCapabilities>> responseMap;
private List<FieldCapabilitiesIndexResponse> indexResponses;
FieldCapabilitiesResponse(Map<String, Map<String, FieldCapabilities>> responseMap) {
this(responseMap, Collections.emptyList());
}
FieldCapabilitiesResponse(List<FieldCapabilitiesIndexResponse> indexResponses) {
this(Collections.emptyMap(), indexResponses);
}
private FieldCapabilitiesResponse(Map<String, Map<String, FieldCapabilities>> responseMap,
List<FieldCapabilitiesIndexResponse> indexResponses) {
this.responseMap = responseMap;
this.indexResponses = indexResponses;
}
/**
@ -53,6 +66,13 @@ public class FieldCapabilitiesResponse extends ActionResponse implements ToXCont
return responseMap;
}
/**
* Returns the actual per-index field caps responses
*/
List<FieldCapabilitiesIndexResponse> getIndexResponses() {
return indexResponses;
}
/**
*
* Get the field capabilities per type for the provided {@code field}.
@ -66,6 +86,11 @@ public class FieldCapabilitiesResponse extends ActionResponse implements ToXCont
super.readFrom(in);
this.responseMap =
in.readMap(StreamInput::readString, FieldCapabilitiesResponse::readField);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
indexResponses = in.readList(FieldCapabilitiesIndexResponse::new);
} else {
indexResponses = Collections.emptyList();
}
}
private static Map<String, FieldCapabilities> readField(StreamInput in) throws IOException {
@ -76,6 +101,10 @@ public class FieldCapabilitiesResponse extends ActionResponse implements ToXCont
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(responseMap, StreamOutput::writeString, FieldCapabilitiesResponse::writeField);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeList(indexResponses);
}
}
private static void writeField(StreamOutput out,

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.fieldcaps;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
@ -27,18 +28,27 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
public class TransportFieldCapabilitiesAction
extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
public class TransportFieldCapabilitiesAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
private final ClusterService clusterService;
private final TransportFieldCapabilitiesIndexAction shardAction;
private final RemoteClusterService remoteClusterService;
private final TransportService transportService;
@Inject
public TransportFieldCapabilitiesAction(Settings settings, TransportService transportService,
@ -50,71 +60,97 @@ public class TransportFieldCapabilitiesAction
super(settings, FieldCapabilitiesAction.NAME, threadPool, transportService,
actionFilters, indexNameExpressionResolver, FieldCapabilitiesRequest::new);
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.transportService = transportService;
this.shardAction = shardAction;
}
@Override
protected void doExecute(FieldCapabilitiesRequest request,
final ActionListener<FieldCapabilitiesResponse> listener) {
ClusterState clusterState = clusterService.state();
String[] concreteIndices =
indexNameExpressionResolver.concreteIndexNames(clusterState, request);
final AtomicInteger indexCounter = new AtomicInteger();
final AtomicInteger completionCounter = new AtomicInteger(concreteIndices.length);
final AtomicReferenceArray<Object> indexResponses =
new AtomicReferenceArray<>(concreteIndices.length);
if (concreteIndices.length == 0) {
final ClusterState clusterState = clusterService.state();
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(),
request.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
final String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices);
final int totalNumRequest = concreteIndices.length + remoteClusterIndices.size();
final CountDown completionCounter = new CountDown(totalNumRequest);
final List<FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedList(new ArrayList<>());
final Runnable onResponse = () -> {
if (completionCounter.countDown()) {
if (request.isMergeResults()) {
listener.onResponse(merge(indexResponses));
} else {
listener.onResponse(new FieldCapabilitiesResponse(indexResponses));
}
}
};
if (totalNumRequest == 0) {
listener.onResponse(new FieldCapabilitiesResponse());
} else {
ActionListener<FieldCapabilitiesIndexResponse> innerListener = new ActionListener<FieldCapabilitiesIndexResponse>() {
@Override
public void onResponse(FieldCapabilitiesIndexResponse result) {
indexResponses.add(result);
onResponse.run();
}
@Override
public void onFailure(Exception e) {
// TODO we should somehow inform the user that we failed
onResponse.run();
}
};
for (String index : concreteIndices) {
FieldCapabilitiesIndexRequest indexRequest =
new FieldCapabilitiesIndexRequest(request.fields(), index);
shardAction.execute(indexRequest,
new ActionListener<FieldCapabilitiesIndexResponse> () {
shardAction.execute(new FieldCapabilitiesIndexRequest(request.fields(), index), innerListener);
}
// this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
// send us back all individual index results.
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
Transport.Connection connection = remoteClusterService.getConnection(remoteIndices.getKey());
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<FieldCapabilitiesResponse>() {
@Override
public void onResponse(FieldCapabilitiesIndexResponse result) {
indexResponses.set(indexCounter.getAndIncrement(), result);
if (completionCounter.decrementAndGet() == 0) {
listener.onResponse(merge(indexResponses));
}
public FieldCapabilitiesResponse newInstance() {
return new FieldCapabilitiesResponse();
}
@Override
public void onFailure(Exception e) {
indexResponses.set(indexCounter.getAndIncrement(), e);
if (completionCounter.decrementAndGet() == 0) {
listener.onResponse(merge(indexResponses));
public void handleResponse(FieldCapabilitiesResponse response) {
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.buildRemoteIndexName(clusterAlias,
res.getIndexName()), res.get()));
}
onResponse.run();
}
@Override
public void handleException(TransportException exp) {
onResponse.run();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
}
}
private FieldCapabilitiesResponse merge(AtomicReferenceArray<Object> indexResponses) {
private FieldCapabilitiesResponse merge(List<FieldCapabilitiesIndexResponse> indexResponses) {
Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder = new HashMap<> ();
for (int i = 0; i < indexResponses.length(); i++) {
Object element = indexResponses.get(i);
if (element instanceof FieldCapabilitiesIndexResponse == false) {
assert element instanceof Exception;
continue;
}
FieldCapabilitiesIndexResponse response = (FieldCapabilitiesIndexResponse) element;
for (String field : response.get().keySet()) {
Map<String, FieldCapabilities.Builder> typeMap = responseMapBuilder.get(field);
if (typeMap == null) {
typeMap = new HashMap<> ();
responseMapBuilder.put(field, typeMap);
}
FieldCapabilities fieldCap = response.getField(field);
FieldCapabilities.Builder builder = typeMap.get(fieldCap.getType());
if (builder == null) {
builder = new FieldCapabilities.Builder(field, fieldCap.getType());
typeMap.put(fieldCap.getType(), builder);
}
builder.add(response.getIndexName(),
fieldCap.isSearchable(), fieldCap.isAggregatable());
}
for (FieldCapabilitiesIndexResponse response : indexResponses) {
innerMerge(responseMapBuilder, response.getIndexName(), response.get());
}
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
@ -131,4 +167,16 @@ public class TransportFieldCapabilitiesAction
return new FieldCapabilitiesResponse(responseMap);
}
private void innerMerge(Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder, String indexName,
Map<String, FieldCapabilities> map) {
for (Map.Entry<String, FieldCapabilities> entry : map.entrySet()) {
final String field = entry.getKey();
final FieldCapabilities fieldCap = entry.getValue();
Map<String, FieldCapabilities.Builder> typeMap = responseMapBuilder.computeIfAbsent(field, f -> new HashMap<>());
FieldCapabilities.Builder builder = typeMap.computeIfAbsent(fieldCap.getType(), key -> new FieldCapabilities.Builder(field,
key));
builder.add(indexName, fieldCap.isSearchable(), fieldCap.isAggregatable());
}
}
}

View File

@ -41,34 +41,19 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class TransportFieldCapabilitiesIndexAction
extends TransportSingleShardAction<FieldCapabilitiesIndexRequest,
public class TransportFieldCapabilitiesIndexAction extends TransportSingleShardAction<FieldCapabilitiesIndexRequest,
FieldCapabilitiesIndexResponse> {
private static final String ACTION_NAME = FieldCapabilitiesAction.NAME + "[index]";
protected final ClusterService clusterService;
private final IndicesService indicesService;
@Inject
public TransportFieldCapabilitiesIndexAction(Settings settings,
ClusterService clusterService,
TransportService transportService,
IndicesService indicesService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver
indexNameExpressionResolver) {
super(settings,
ACTION_NAME,
threadPool,
clusterService,
transportService,
actionFilters,
indexNameExpressionResolver,
FieldCapabilitiesIndexRequest::new,
ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
public TransportFieldCapabilitiesIndexAction(Settings settings, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
FieldCapabilitiesIndexRequest::new, ThreadPool.Names.MANAGEMENT);
this.indicesService = indicesService;
}
@ -86,11 +71,8 @@ public class TransportFieldCapabilitiesIndexAction
}
@Override
protected FieldCapabilitiesIndexResponse shardOperation(
final FieldCapabilitiesIndexRequest request,
ShardId shardId) {
MapperService mapperService =
indicesService.indexServiceSafe(shardId.getIndex()).mapperService();
protected FieldCapabilitiesIndexResponse shardOperation(final FieldCapabilitiesIndexRequest request, ShardId shardId) {
MapperService mapperService = indicesService.indexServiceSafe(shardId.getIndex()).mapperService();
Set<String> fieldNames = new HashSet<>();
for (String field : request.fields()) {
fieldNames.addAll(mapperService.simpleMatchToIndexNames(field));
@ -98,10 +80,7 @@ public class TransportFieldCapabilitiesIndexAction
Map<String, FieldCapabilities> responseMap = new HashMap<>();
for (String field : fieldNames) {
MappedFieldType ft = mapperService.fullName(field);
FieldCapabilities fieldCap = new FieldCapabilities(field,
ft.typeName(),
ft.isSearchable(),
ft.isAggregatable());
FieldCapabilities fieldCap = new FieldCapabilities(field, ft.typeName(), ft.isSearchable(), ft.isAggregatable());
responseMap.put(field, fieldCap);
}
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), responseMap);
@ -113,9 +92,7 @@ public class TransportFieldCapabilitiesIndexAction
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state,
InternalRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_READ,
request.concreteIndex());
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_READ, request.concreteIndex());
}
}

View File

@ -178,35 +178,17 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
final SearchTimeProvider timeProvider =
new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime);
final OriginalIndices localIndices;
final Map<String, OriginalIndices> remoteClusterIndices;
final ClusterState clusterState = clusterService.state();
if (remoteClusterService.isCrossClusterSearchEnabled()) {
final Map<String, List<String>> groupedIndices = remoteClusterService.groupClusterIndices(searchRequest.indices(),
// empty string is not allowed
idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
List<String> remove = groupedIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
String[] indices = remove == null ? Strings.EMPTY_ARRAY : remove.toArray(new String[remove.size()]);
localIndices = new OriginalIndices(indices, searchRequest.indicesOptions());
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
String clusterAlias = entry.getKey();
List<String> originalIndices = entry.getValue();
originalIndicesMap.put(clusterAlias,
new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), searchRequest.indicesOptions()));
}
remoteClusterIndices = Collections.unmodifiableMap(originalIndicesMap);
} else {
remoteClusterIndices = Collections.emptyMap();
localIndices = new OriginalIndices(searchRequest);
}
final ClusterState clusterState = clusterService.state();
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener);
} else {
remoteClusterService.collectSearchShards(searchRequest, remoteClusterIndices,
ActionListener.wrap((searchShardsResponses) -> {
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,

View File

@ -159,4 +159,8 @@ public abstract class RemoteClusterAware extends AbstractComponent {
throw new IllegalArgumentException("port must be a number", e);
}
}
public static final String buildRemoteIndexName(String clusterAlias, String indexName) {
return clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName;
}
}

View File

@ -152,7 +152,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
/**
* Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end.
*/
public void fetchSearchShards(SearchRequest searchRequest, final String[] indices,
public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
ActionListener<ClusterSearchShardsResponse> listener) {
if (connectedNodes.isEmpty()) {
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
@ -160,18 +160,15 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
// we can't proceed with a search on a cluster level.
// in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller
// end since they provide the listener.
connectHandler.connect(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, indices, listener), listener::onFailure));
connectHandler.connect(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, listener), listener::onFailure));
} else {
fetchShardsInternal(searchRequest, indices, listener);
fetchShardsInternal(searchRequest, listener);
}
}
private void fetchShardsInternal(SearchRequest searchRequest, String[] indices,
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = nodeSupplier.get();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
.indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference())
.routing(searchRequest.routing());
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
new TransportResponseHandler<ClusterSearchShardsResponse>() {
@ -224,7 +221,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
};
}
@Override
Transport.Connection getConnection() {
DiscoveryNode discoveryNode = nodeSupplier.get();
return transportService.getConnection(discoveryNode);
}
@Override
public void close() throws IOException {
connectHandler.close();
}

View File

@ -24,10 +24,12 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardIterator;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
@ -176,6 +178,25 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
return remoteClusters.get(remoteCluster).isNodeConnected(node);
}
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, Predicate<String> indexExists) {
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
if (isCrossClusterSearchEnabled()) {
final Map<String, List<String>> groupedIndices = groupClusterIndices(indices, indexExists);
for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
String clusterAlias = entry.getKey();
List<String> originalIndices = entry.getValue();
originalIndicesMap.put(clusterAlias,
new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
}
if (originalIndicesMap.containsKey(LOCAL_CLUSTER_GROUP_KEY) == false) {
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
}
} else {
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(indices, indicesOptions));
}
return originalIndicesMap;
}
/**
* Returns <code>true</code> iff the given cluster is configured as a remote cluster. Otherwise <code>false</code>
*/
@ -183,8 +204,9 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
return remoteClusters.containsKey(clusterName);
}
public void collectSearchShards(SearchRequest searchRequest, Map<String, OriginalIndices> remoteIndicesByCluster,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
public void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing,
Map<String, OriginalIndices> remoteIndicesByCluster,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<TransportException> transportException = new AtomicReference<>();
@ -195,7 +217,10 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
throw new IllegalArgumentException("no such remote cluster: " + clusterName);
}
final String[] indices = entry.getValue().indices();
remoteClusterConnection.fetchSearchShards(searchRequest, indices,
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
.indicesOptions(indicesOptions).local(true).preference(preference)
.routing(routing);
remoteClusterConnection.fetchSearchShards(searchShardsRequest,
new ActionListener<ClusterSearchShardsResponse>() {
@Override
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
@ -240,6 +265,14 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
return connection.getConnection(node);
}
public Transport.Connection getConnection(String cluster) {
RemoteClusterConnection connection = remoteClusters.get(cluster);
if (connection == null) {
throw new IllegalArgumentException("no such remote cluster: " + cluster);
}
return connection.getConnection();
}
@Override
protected Set<String> getRemoteClusterNames() {
return this.remoteClusters.keySet();

View File

@ -385,7 +385,10 @@ public class RemoteClusterConnectionTests extends ESTestCase {
failReference.set(x);
responseLatch.countDown();
});
connection.fetchSearchShards(request, new String[]{"test-index"}, shardsListener);
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(new String[]{"test-index"})
.indicesOptions(request.indicesOptions()).local(true).preference(request.preference())
.routing(request.routing());
connection.fetchSearchShards(searchShardsRequest, shardsListener);
responseLatch.await();
assertNull(failReference.get());
assertNotNull(reference.get());

View File

@ -0,0 +1,66 @@
---
"Get simple field caps from remote cluster":
- skip:
version: " - 5.99.99"
reason: this uses a new API that has been added in 6.0.0
- do:
indices.create:
index: field_caps_index_2
body:
mappings:
t:
properties:
text:
type: text
keyword:
type: keyword
number:
type: double
geo:
type: geo_point
object:
type: object
properties:
nested1 :
type : text
index: true
nested2:
type: float
doc_values: true
- do:
field_caps:
index: 'field_caps_index_2,my_remote_cluster:field_*'
fields: [text, keyword, number, geo]
- match: {fields.text.text.searchable: true}
- match: {fields.text.text.aggregatable: false}
- is_false: fields.text.text.indices
- is_false: fields.text.text.non_searchable_indices
- is_false: fields.text.text.non_aggregatable_indices
- match: {fields.keyword.keyword.searchable: true}
- match: {fields.keyword.keyword.aggregatable: true}
- is_false: fields.text.keyword.indices
- is_false: fields.text.keyword.non_searchable_indices
- is_false: fields.text.keyword.non_aggregatable_indices
- match: {fields.number.double.searchable: true}
- match: {fields.number.double.aggregatable: true}
- match: {fields.number.double.indices: ["field_caps_index_2", "my_remote_cluster:field_caps_index_1"]}
- is_false: fields.number.double.non_searchable_indices
- is_false: fields.number.double.non_aggregatable_indices
- match: {fields.number.long.searchable: true}
- match: {fields.number.long.aggregatable: true}
- match: {fields.number.long.indices: ["my_remote_cluster:field_caps_index_3"]}
- is_false: fields.number.long.non_searchable_indices
- is_false: fields.number.long.non_aggregatable_indices
- match: {fields.geo.geo_point.searchable: true}
- match: {fields.geo.geo_point.aggregatable: true}
- match: {fields.geo.geo_point.indices: ["field_caps_index_2", "my_remote_cluster:field_caps_index_1"]}
- is_false: fields.geo.geo_point.non_searchable_indices
- is_false: fields.geo.geo_point.non_aggregatable_indices
- match: {fields.geo.keyword.searchable: true}
- match: {fields.geo.keyword.aggregatable: true}
- match: {fields.geo.keyword.indices: ["my_remote_cluster:field_caps_index_3"]}
- is_false: fields.geo.keyword.non_searchable_indices
- is_false: fields.geo.keyword.on_aggregatable_indices

View File

@ -1,6 +1,54 @@
---
"Index data and search on the old cluster":
- do:
indices.create:
index: field_caps_index_1
body:
mappings:
t:
properties:
text:
type: text
keyword:
type: keyword
number:
type: double
geo:
type: geo_point
object:
type: object
properties:
nested1 :
type : text
index: false
nested2:
type: float
doc_values: false
- do:
indices.create:
index: field_caps_index_3
body:
mappings:
t:
properties:
text:
type: text
keyword:
type: keyword
number:
type: long
geo:
type: keyword
object:
type: object
properties:
nested1 :
type : long
index: false
nested2:
type: keyword
doc_values: false
- do:
indices.create:
index: test_index

View File

@ -76,7 +76,7 @@ setup:
---
"Get simple field caps":
- skip:
version: " - 5.3.99"
version: " - 5.99.99" # temporarily disabled until bwc code is backported and propagated
reason: this uses a new API that has been added in 5.4.0
- do:
@ -117,7 +117,7 @@ setup:
---
"Get nested field caps":
- skip:
version: " - 5.3.99"
version: " - 5.99.99" # temporarily disabled until bwc code is backported and propagated
reason: this uses a new API that has been added in 5.4.0
- do:
@ -148,7 +148,7 @@ setup:
---
"Get prefix field caps":
- skip:
version: " - 5.3.99"
version: " - 5.99.99" # temporarily disabled until bwc code is backported and propagated
reason: this uses a new API that has been added in 5.4.0
- do: