Cross Cluster Search: propagate original indices per cluster (#24328)

In case of a Cross Cluster Search, the coordinating node should split the original indices per cluster, and send over to each cluster only its own set of original indices, rather than the set taken from the original search request which contains all the indices.

In fact, each remote cluster should not be aware of the indices belonging to other remote clusters.
This commit is contained in:
Luca Cavanna 2017-04-26 21:45:49 +02:00 committed by GitHub
parent 2ed1f7a339
commit 149629fec6
51 changed files with 443 additions and 283 deletions

View File

@ -28,7 +28,10 @@ import java.io.IOException;
/**
* Used to keep track of original indices within internal (e.g. shard level) requests
*/
public class OriginalIndices implements IndicesRequest {
public final class OriginalIndices implements IndicesRequest {
//constant to use when original indices are not applicable and will not be serialized across the wire
public static final OriginalIndices NONE = new OriginalIndices(null, null);
private final String[] indices;
private final IndicesOptions indicesOptions;
@ -39,7 +42,6 @@ public class OriginalIndices implements IndicesRequest {
public OriginalIndices(String[] indices, IndicesOptions indicesOptions) {
this.indices = indices;
assert indicesOptions != null;
this.indicesOptions = indicesOptions;
}
@ -57,8 +59,8 @@ public class OriginalIndices implements IndicesRequest {
return new OriginalIndices(in.readStringArray(), IndicesOptions.readIndicesOptions(in));
}
public static void writeOriginalIndices(OriginalIndices originalIndices, StreamOutput out) throws IOException {
assert originalIndices != NONE;
out.writeStringArrayNullable(originalIndices.indices);
originalIndices.indicesOptions.writeIndicesOptions(out);
}

View File

@ -88,7 +88,7 @@ public class TransportClusterSearchShardsAction extends
}
Set<String> nodeIds = new HashSet<>();
GroupShardsIterator groupShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices,
GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices,
routingMap, request.preference());
ShardRouting shard;
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
@ -75,8 +74,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion, SearchTask task, SearchPhaseResults<Result> resultConsumer) {
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, SearchPhaseResults<Result> resultConsumer) {
super(name, request, shardsIts, logger);
this.timeProvider = timeProvider;
this.logger = logger;
@ -209,8 +209,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private void raisePhaseFailure(SearchPhaseExecutionException exception) {
results.getSuccessfulResults().forEach((entry) -> {
try {
Transport.Connection connection = nodeIdToConnection.apply(entry.getSearchShardTarget().getNodeId());
sendReleaseSearchContext(entry.getRequestId(), connection);
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = nodeIdToConnection.apply(searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getRequestId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
@ -296,11 +297,11 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
listener.onFailure(e);
}
public final ShardSearchTransportRequest buildShardSearchRequest(ShardIterator shardIt, ShardRouting shard) {
public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard) {
AliasFilter filter = aliasFilter.get(shard.index().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST);
return new ShardSearchTransportRequest(request, shardIt.shardId(), getNumShards(),
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis());
}

View File

@ -73,7 +73,8 @@ final class DfsQueryPhase extends SearchPhase {
for (final DfsSearchResult dfsResult : resultList) {
final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(context.getRequest(), dfsResult.getRequestId(), dfs);
QuerySearchRequest querySearchRequest = new QuerySearchRequest(searchShardTarget.getOriginalIndices(),
dfsResult.getRequestId(), dfs);
final int shardIndex = dfsResult.getShardIndex();
searchTransportService.sendExecuteQuery(connection, querySearchRequest, context.getTask(),
new SearchActionListener<QuerySearchResult>(searchShardTarget, shardIndex) {
@ -95,7 +96,7 @@ final class DfsQueryPhase extends SearchPhase {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
context.sendReleaseSearchContext(querySearchRequest.id(), connection);
context.sendReleaseSearchContext(querySearchRequest.id(), connection, searchShardTarget.getOriginalIndices());
}
}
});

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
@ -73,7 +74,6 @@ final class FetchSearchPhase extends SearchPhase {
this.context = context;
this.logger = context.getLogger();
this.resultConsumer = resultConsumer;
}
@Override
@ -112,7 +112,7 @@ final class FetchSearchPhase extends SearchPhase {
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
phaseResults.stream()
.map(e -> e.queryResult())
.map(SearchPhaseResult::queryResult)
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
finishPhase.run();
} else {
@ -135,10 +135,11 @@ final class FetchSearchPhase extends SearchPhase {
// in any case we count down this result since we don't talk to this shard anymore
counter.countDown();
} else {
Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId());
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
lastEmittedDocPerShard);
executeFetch(i, queryResult.getSearchShardTarget(), counter, fetchSearchRequest, queryResult.queryResult(),
lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
connection);
}
}
@ -147,9 +148,9 @@ final class FetchSearchPhase extends SearchPhase {
}
protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry,
ScoreDoc[] lastEmittedDocPerShard) {
ScoreDoc[] lastEmittedDocPerShard, OriginalIndices originalIndices) {
final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null;
return new ShardFetchSearchRequest(context.getRequest(), queryId, entry, lastEmittedDoc);
return new ShardFetchSearchRequest(originalIndices, queryId, entry, lastEmittedDoc);
}
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,
@ -189,8 +190,9 @@ final class FetchSearchPhase extends SearchPhase {
// and if it has at lease one hit that didn't make it to the global topDocs
if (context.getRequest().scroll() == null && queryResult.hasSearchContext()) {
try {
Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId());
context.sendReleaseSearchContext(queryResult.getRequestId(), connection);
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId());
context.sendReleaseSearchContext(queryResult.getRequestId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception e) {
context.getLogger().trace("failed to release context", e);
}

View File

@ -46,12 +46,12 @@ import java.util.stream.Stream;
*/
abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends SearchPhase {
private final SearchRequest request;
private final GroupShardsIterator shardsIts;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
private final Logger logger;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator shardsIts, Logger logger) {
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts, Logger logger) {
super(name);
this.request = request;
this.shardsIts = shardsIts;
@ -64,10 +64,10 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
}
private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
final ShardIterator shardIt, Exception e) {
final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId());
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getOriginalIndices());
onShardFailure(shardIndex, shardTarget, e);
if (totalOps.incrementAndGet() == expectedTotalOps) {
@ -124,7 +124,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
@Override
public final void run() throws IOException {
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
for (final SearchShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
@ -136,7 +136,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
}
}
private void performPhaseOnShard(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// TODO upgrade this to an assert...
// no more active shards... (we should not really get here, but just for safety)
@ -144,7 +144,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
} else {
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId()), shardIndex) {
shardIt.shardId(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
onShardResult(result, shardIt);
@ -213,7 +213,8 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
* @param shard the shard routing to send the request for
* @param listener the listener to notify on response
*/
protected abstract void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, SearchActionListener<FirstResult> listener);
protected abstract void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<FirstResult> listener);
/**
* This class acts as a basic result collection that can be extended to do on-the-fly reduction or result processing

View File

@ -162,7 +162,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
/**
* Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end.
*/
public void fetchSearchShards(SearchRequest searchRequest, final List<String> indices,
public void fetchSearchShards(SearchRequest searchRequest, final String[] indices,
ActionListener<ClusterSearchShardsResponse> listener) {
if (connectedNodes.isEmpty()) {
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
@ -176,10 +176,10 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
}
}
private void fetchShardsInternal(SearchRequest searchRequest, List<String> indices,
private void fetchShardsInternal(SearchRequest searchRequest, String[] indices,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = nodeSupplier.get();
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices.toArray(new String[indices.size()]))
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
.indicesOptions(searchRequest.indicesOptions()).local(true).preference(searchRequest.preference())
.routing(searchRequest.routing());
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,

View File

@ -22,14 +22,13 @@ import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
@ -243,18 +242,18 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
return remoteClusters.containsKey(clusterName);
}
void collectSearchShards(SearchRequest searchRequest, Map<String, List<String>> remoteIndicesByCluster,
void collectSearchShards(SearchRequest searchRequest, Map<String, OriginalIndices> remoteIndicesByCluster,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<TransportException> transportException = new AtomicReference<>();
for (Map.Entry<String, List<String>> entry : remoteIndicesByCluster.entrySet()) {
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
final String clusterName = entry.getKey();
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName);
if (remoteClusterConnection == null) {
throw new IllegalArgumentException("no such remote cluster: " + clusterName);
}
final List<String> indices = entry.getValue();
final String[] indices = entry.getValue().indices();
remoteClusterConnection.fetchSearchShards(searchRequest, indices,
new ActionListener<ClusterSearchShardsResponse>() {
@Override
@ -288,16 +287,16 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
}
}
Function<String, Transport.Connection> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
List<ShardIterator> remoteShardIterators,
Map<String, AliasFilter> aliasFilterMap) {
Map<String, OriginalIndices> remoteIndicesByCluster,
List<SearchShardIterator> remoteShardIterators,
Map<String, AliasFilter> aliasFilterMap) {
Map<String, Supplier<Transport.Connection>> nodeToCluster = new HashMap<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
String clusterName = entry.getKey();
String clusterAlias = entry.getKey();
ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
nodeToCluster.put(remoteNode.getId(), () -> getConnection(remoteNode, clusterName));
nodeToCluster.put(remoteNode.getId(), () -> getConnection(remoteNode, clusterAlias));
}
Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
@ -305,9 +304,11 @@ public final class RemoteClusterService extends AbstractComponent implements Clo
//this ends up in the hits returned with the search response
ShardId shardId = clusterSearchShardsGroup.getShardId();
Index remoteIndex = shardId.getIndex();
Index index = new Index(clusterName + REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(), remoteIndex.getUUID());
ShardIterator shardIterator = new PlainShardIterator(new ShardId(index, shardId.getId()),
Arrays.asList(clusterSearchShardsGroup.getShards()));
Index index = new Index(clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(), remoteIndex.getUUID());
OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
assert originalIndices != null;
SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()),
Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices);
remoteShardIterators.add(shardIterator);
AliasFilter aliasFilter;
if (indicesAndFilters == null) {

View File

@ -49,5 +49,4 @@ abstract class SearchActionListener<T extends SearchPhaseResult> implements Acti
}
protected abstract void innerOnResponse(T response);
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.internal.AliasFilter;
@ -46,7 +45,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final Executor executor,
final SearchRequest request,
final ActionListener<SearchResponse> listener,
final GroupShardsIterator shardsIts,
final GroupShardsIterator<SearchShardIterator> shardsIts,
final TransportSearchAction.SearchTimeProvider timeProvider,
final long clusterStateVersion,
final SearchTask task) {
@ -70,7 +69,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
@Override
protected void executePhaseOnShard(
final ShardIterator shardIt,
final SearchShardIterator shardIt,
final ShardRouting shard,
final SearchActionListener<DfsSearchResult> listener) {
getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()),

View File

@ -20,7 +20,7 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchShardTarget;
@ -97,16 +97,16 @@ interface SearchPhaseContext extends ActionListener<SearchResponse>, Executor {
* @see org.elasticsearch.search.fetch.FetchSearchResult#getRequestId()
*
*/
default void sendReleaseSearchContext(long contextId, Transport.Connection connection) {
default void sendReleaseSearchContext(long contextId, Transport.Connection connection, OriginalIndices originalIndices) {
if (connection != null) {
getSearchTransport().sendFreeContext(connection, contextId, getRequest());
getSearchTransport().sendFreeContext(connection, contextId, originalIndices);
}
}
/**
* Builds an request for the initial search phase.
*/
ShardSearchTransportRequest buildShardSearchRequest(ShardIterator shardIt, ShardRouting shard);
ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard);
/**
* Processes the phase transition from on phase to another. This method handles all errors that happen during the initial run execution

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.AliasFilter;
@ -32,8 +31,7 @@ import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
final class SearchQueryThenFetchAsyncAction
extends AbstractSearchAsyncAction<SearchPhaseResult> {
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPhaseResult> {
private final SearchPhaseController searchPhaseController;
@ -47,7 +45,7 @@ final class SearchQueryThenFetchAsyncAction
final Executor executor,
final SearchRequest request,
final ActionListener<SearchResponse> listener,
final GroupShardsIterator shardsIts,
final GroupShardsIterator<SearchShardIterator> shardsIts,
final TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion,
SearchTask task) {
@ -70,7 +68,7 @@ final class SearchQueryThenFetchAsyncAction
}
protected void executePhaseOnShard(
final ShardIterator shardIt,
final SearchShardIterator shardIt,
final ShardRouting shard,
final SearchActionListener<SearchPhaseResult> listener) {
getSearchTransport().sendExecuteQuery(

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.shard.ShardId;
import java.util.List;
/**
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
* of the search request. Useful especially with cross cluster search, as each cluster has its own set of original indices.
*/
public final class SearchShardIterator extends PlainShardIterator {
private final OriginalIndices originalIndices;
/**
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
* this the a given <code>shardId</code>.
*
* @param shardId shard id of the group
* @param shards shards to iterate
*/
public SearchShardIterator(ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
super(shardId, shards);
this.originalIndices = originalIndices;
}
/**
* Returns the original indices associated with this shard iterator, specifically with the cluster that this shard belongs to.
*/
public OriginalIndices getOriginalIndices() {
return originalIndices;
}
}

View File

@ -92,8 +92,8 @@ public class SearchTransportService extends AbstractLifecycleComponent {
}
}
public void sendFreeContext(Transport.Connection connection, final long contextId, SearchRequest request) {
transportService.sendRequest(connection, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId),
public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {
transportService.sendRequest(connection, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(originalIndices, contextId),
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(new ActionListener<SearchFreeContextResponse>() {
@Override
public void onResponse(SearchFreeContextResponse response) {
@ -219,9 +219,9 @@ public class SearchTransportService extends AbstractLifecycleComponent {
SearchFreeContextRequest() {
}
SearchFreeContextRequest(SearchRequest request, long id) {
SearchFreeContextRequest(OriginalIndices originalIndices, long id) {
super(id);
this.originalIndices = new OriginalIndices(request);
this.originalIndices = originalIndices;
}
@Override

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
@ -212,7 +213,8 @@ public class ShardSearchFailure implements ShardOperationFailedException {
}
}
return new ShardSearchFailure(exception,
new SearchShardTarget(nodeId, new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId)));
new SearchShardTarget(nodeId,
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), OriginalIndices.NONE));
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
@ -162,10 +163,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
long getRelativeCurrentNanos() {
return relativeCurrentNanosProvider.getAsLong();
}
}
@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
final long absoluteStartMillis = System.currentTimeMillis();
@ -173,17 +172,27 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
final SearchTimeProvider timeProvider =
new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime);
final String[] localIndices;
final Map<String, List<String>> remoteClusterIndices;
final OriginalIndices localIndices;
final Map<String, OriginalIndices> remoteClusterIndices;
final ClusterState clusterState = clusterService.state();
if (remoteClusterService.isCrossClusterSearchEnabled()) {
remoteClusterIndices = remoteClusterService.groupClusterIndices( searchRequest.indices(), // empty string is not allowed
final Map<String, List<String>> groupedIndices = remoteClusterService.groupClusterIndices(searchRequest.indices(),
// empty string is not allowed
idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
List<String> remove = remoteClusterIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
localIndices = remove == null ? Strings.EMPTY_ARRAY : remove.toArray(new String[remove.size()]);
List<String> remove = groupedIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
String[] indices = remove == null ? Strings.EMPTY_ARRAY : remove.toArray(new String[remove.size()]);
localIndices = new OriginalIndices(indices, searchRequest.indicesOptions());
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
String clusterAlias = entry.getKey();
List<String> originalIndices = entry.getValue();
originalIndicesMap.put(clusterAlias,
new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), searchRequest.indicesOptions()));
}
remoteClusterIndices = Collections.unmodifiableMap(originalIndicesMap);
} else {
remoteClusterIndices = Collections.emptyMap();
localIndices = searchRequest.indices();
localIndices = new OriginalIndices(searchRequest);
}
if (remoteClusterIndices.isEmpty()) {
@ -192,18 +201,18 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
} else {
remoteClusterService.collectSearchShards(searchRequest, remoteClusterIndices,
ActionListener.wrap((searchShardsResponses) -> {
List<ShardIterator> remoteShardIterators = new ArrayList<>();
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
Function<String, Transport.Connection> connectionFunction = remoteClusterService.processRemoteShards(
searchShardsResponses, remoteShardIterators, remoteAliasFilters);
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators,
connectionFunction, clusterState, remoteAliasFilters, listener);
}, listener::onFailure));
}
}
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, String[] localIndices,
List<ShardIterator> remoteShardIterators, Function<String, Transport.Connection> remoteConnections,
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
List<SearchShardIterator> remoteShardIterators, Function<String, Transport.Connection> remoteConnections,
ClusterState clusterState, Map<String, AliasFilter> remoteAliasMap,
ActionListener<SearchResponse> listener) {
@ -212,11 +221,11 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
final Index[] indices;
if (localIndices.length == 0 && remoteShardIterators.size() > 0) {
if (localIndices.indices().length == 0 && remoteShardIterators.size() > 0) {
indices = Index.EMPTY_ARRAY; // don't search on _all if only remote indices were specified
} else {
indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
timeProvider.getAbsoluteStartMillis(), localIndices);
timeProvider.getAbsoluteStartMillis(), localIndices.indices());
}
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
@ -225,9 +234,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
}
GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
searchRequest.preference());
GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, remoteShardIterators);
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators);
failIfOverShardCountLimit(clusterService, shardIterators.size());
@ -268,19 +277,17 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start();
}
private static GroupShardsIterator mergeShardsIterators(GroupShardsIterator localShardsIterator,
List<ShardIterator> remoteShardIterators) {
if (remoteShardIterators.isEmpty()) {
return localShardsIterator;
}
List<ShardIterator> shards = new ArrayList<>();
for (ShardIterator shardIterator : remoteShardIterators) {
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
OriginalIndices localIndices,
List<SearchShardIterator> remoteShardIterators) {
List<SearchShardIterator> shards = new ArrayList<>();
for (SearchShardIterator shardIterator : remoteShardIterators) {
shards.add(shardIterator);
}
for (ShardIterator shardIterator : localShardsIterator) {
shards.add(shardIterator);
shards.add(new SearchShardIterator(shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
}
return new GroupShardsIterator(shards);
return new GroupShardsIterator<>(shards);
}
@Override
@ -288,7 +295,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
throw new UnsupportedOperationException("the task parameter is required");
}
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest, GroupShardsIterator shardIterators,
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider, Function<String, Transport.Connection> connectionLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,

View File

@ -94,7 +94,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
* Determines the shards this operation will be executed on. The operation is executed once per shard iterator, typically
* on the first shard in it. If the operation fails, it will be retried on the next shard in the iterator.
*/
protected abstract GroupShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices);
protected abstract GroupShardsIterator<ShardIterator> shards(ClusterState clusterState, Request request, String[] concreteIndices);
protected abstract ClusterBlockException checkGlobalBlock(ClusterState state, Request request);
@ -107,7 +107,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
private final ActionListener<Response> listener;
private final ClusterState clusterState;
private final DiscoveryNodes nodes;
private final GroupShardsIterator shardsIts;
private final GroupShardsIterator<ShardIterator> shardsIts;
private final int expectedOps;
private final AtomicInteger counterOps = new AtomicInteger();
private final AtomicReferenceArray shardsResponses;

View File

@ -270,7 +270,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
ShardsIterator shardIt = shards(clusterState, request, concreteIndices);
nodeIds = new HashMap<>();
for (ShardRouting shard : shardIt.asUnordered()) {
for (ShardRouting shard : shardIt) {
// send a request to the shard only if it is assigned to a node that is in the local node's cluster state
// a scenario in which a shard can be assigned but to a node that is not in the local node's cluster state
// is when the shard is assigned to the master node, the local node has detected the master as failed

View File

@ -58,7 +58,7 @@ public class TransportTermVectorsAction extends TransportSingleShardAction<TermV
protected ShardIterator shards(ClusterState state, InternalRequest request) {
if (request.request().doc() != null && request.request().routing() == null) {
// artificial document without routing specified, ignore its "id" and use either random shard or according to preference
GroupShardsIterator groupShardsIter = clusterService.operationRouting().searchShards(state,
GroupShardsIterator<ShardIterator> groupShardsIter = clusterService.operationRouting().searchShards(state,
new String[] { request.concreteIndex() }, null, request.request().preference());
return groupShardsIter.iterator().next();
}

View File

@ -30,14 +30,14 @@ import java.util.List;
* ShardsIterators are always returned in ascending order independently of their order at construction
* time. The incoming iterators are sorted to ensure consistent iteration behavior across Nodes / JVMs.
*/
public final class GroupShardsIterator implements Iterable<ShardIterator> {
public final class GroupShardsIterator<ShardIt extends ShardIterator> implements Iterable<ShardIt> {
private final List<ShardIterator> iterators;
private final List<ShardIt> iterators;
/**
* Constructs a enw GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIterator> iterators) {
public GroupShardsIterator(List<ShardIt> iterators) {
CollectionUtil.timSort(iterators);
this.iterators = iterators;
}
@ -60,7 +60,7 @@ public final class GroupShardsIterator implements Iterable<ShardIterator> {
*/
public int totalSizeWith1ForEmpty() {
int size = 0;
for (ShardIterator shard : iterators) {
for (ShardIt shard : iterators) {
size += Math.max(1, shard.size());
}
return size;
@ -75,7 +75,7 @@ public final class GroupShardsIterator implements Iterable<ShardIterator> {
}
@Override
public Iterator<ShardIterator> iterator() {
public Iterator<ShardIt> iterator() {
return iterators.iterator();
}
}

View File

@ -23,7 +23,6 @@ import com.carrotsearch.hppc.IntSet;
import com.carrotsearch.hppc.cursors.IntCursor;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -265,37 +264,6 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
return new PlainShardsIterator(shuffler.shuffle(allActiveShards));
}
/**
* A group shards iterator where each group ({@link ShardIterator}
* is an iterator across shard replication group.
*/
public GroupShardsIterator groupByShardsIt() {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<>(shards.size());
for (IndexShardRoutingTable indexShard : this) {
set.add(indexShard.shardsIt());
}
return new GroupShardsIterator(set);
}
/**
* A groups shards iterator where each groups is a single {@link ShardRouting} and a group
* is created for each shard routing.
* <p>
* This basically means that components that use the {@link GroupShardsIterator} will iterate
* over *all* the shards (all the replicas) within the index.</p>
*/
public GroupShardsIterator groupByAllIt() {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<>();
for (IndexShardRoutingTable indexShard : this) {
for (ShardRouting shardRouting : indexShard) {
set.add(shardRouting.shardsIt());
}
}
return new GroupShardsIterator(set);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -68,7 +68,7 @@ public class OperationRouting extends AbstractComponent {
return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference);
}
public GroupShardsIterator searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map<String, Set<String>> routing, @Nullable String preference) {
public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map<String, Set<String>> routing, @Nullable String preference) {
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
final Set<ShardIterator> set = new HashSet<>(shards.size());
for (IndexShardRoutingTable shard : shards) {
@ -77,7 +77,7 @@ public class OperationRouting extends AbstractComponent {
set.add(iterator);
}
}
return new GroupShardsIterator(new ArrayList<>(set));
return new GroupShardsIterator<>(new ArrayList<>(set));
}
private static final Map<String, Set<String>> EMPTY_ROUTING = Collections.emptyMap();

View File

@ -43,7 +43,6 @@ public class PlainShardIterator extends PlainShardsIterator implements ShardIter
this.shardId = shardId;
}
@Override
public ShardId shardId() {
return this.shardId;

View File

@ -18,6 +18,8 @@
*/
package org.elasticsearch.cluster.routing;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/**
@ -74,7 +76,12 @@ public class PlainShardsIterator implements ShardsIterator {
}
@Override
public Iterable<ShardRouting> asUnordered() {
return shards;
public List<ShardRouting> getShardRoutings() {
return Collections.unmodifiableList(shards);
}
@Override
public Iterator<ShardRouting> iterator() {
return shards.iterator();
}
}

View File

@ -238,7 +238,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, includeRelocationTargets, ACTIVE_PREDICATE);
}
public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty) {
public GroupShardsIterator<ShardIterator> allAssignedShardsGrouped(String[] indices, boolean includeEmpty) {
return allAssignedShardsGrouped(indices, includeEmpty, false);
}
@ -249,14 +249,14 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
* @param includeRelocationTargets if true, an <b>extra</b> shard iterator will be added for relocating shards. The extra
* iterator contains a single ShardRouting pointing at the relocating target
*/
public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets) {
public GroupShardsIterator<ShardIterator> allAssignedShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets) {
return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, includeRelocationTargets, ASSIGNED_PREDICATE);
}
private static Predicate<ShardRouting> ACTIVE_PREDICATE = shardRouting -> shardRouting.active();
private static Predicate<ShardRouting> ASSIGNED_PREDICATE = shardRouting -> shardRouting.assignedToNode();
private static Predicate<ShardRouting> ACTIVE_PREDICATE = ShardRouting::active;
private static Predicate<ShardRouting> ASSIGNED_PREDICATE = ShardRouting::assignedToNode;
private GroupShardsIterator allSatisfyingPredicateShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets, Predicate<ShardRouting> predicate) {
private GroupShardsIterator<ShardIterator> allSatisfyingPredicateShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets, Predicate<ShardRouting> predicate) {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<>();
for (String index : indices) {
@ -278,7 +278,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
}
}
}
return new GroupShardsIterator(set);
return new GroupShardsIterator<>(set);
}
public ShardsIterator allShards(String[] indices) {
@ -320,9 +320,8 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
* @param indices The indices to return all the shards (replicas)
* @return All the primary shards grouped into a single shard element group each
* @throws IndexNotFoundException If an index passed does not exists
* @see IndexRoutingTable#groupByAllIt()
*/
public GroupShardsIterator activePrimaryShardsGrouped(String[] indices, boolean includeEmpty) {
public GroupShardsIterator<ShardIterator> activePrimaryShardsGrouped(String[] indices, boolean includeEmpty) {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<>();
for (String index : indices) {
@ -339,7 +338,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
}
}
}
return new GroupShardsIterator(set);
return new GroupShardsIterator<>(set);
}
@Override

View File

@ -18,10 +18,12 @@
*/
package org.elasticsearch.cluster.routing;
import java.util.List;
/**
* Allows to iterate over unrelated shards.
*/
public interface ShardsIterator {
public interface ShardsIterator extends Iterable<ShardRouting> {
/**
* Resets the iterator to its initial state.
@ -60,6 +62,9 @@ public interface ShardsIterator {
@Override
boolean equals(Object other);
Iterable<ShardRouting> asUnordered();
/**
* Returns the {@link ShardRouting}s that this shards iterator holds.
*/
List<ShardRouting> getShardRoutings();
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search;
import org.apache.lucene.search.Explanation;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
@ -544,7 +545,7 @@ public final class SearchHit implements Streamable, ToXContentObject, Iterable<S
ShardId shardId = get(Fields._SHARD, values, null);
String nodeId = get(Fields._NODE, values, null);
if (shardId != null && nodeId != null) {
searchHit.shard(new SearchShardTarget(nodeId, shardId));
searchHit.shard(new SearchShardTarget(nodeId, shardId, OriginalIndices.NONE));
}
searchHit.fields(fields);
return searchHit;

View File

@ -24,6 +24,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
@ -40,7 +41,6 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.search.collapse.CollapseContext;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.shard.IndexEventListener;
@ -55,6 +55,7 @@ import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.SearchContextAggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseContext;
import org.elasticsearch.search.dfs.DfsPhase;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchPhase;
@ -498,7 +499,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
throws IOException {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(), indexShard.shardId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
indexShard.shardId(), OriginalIndices.NONE);
Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,

View File

@ -19,6 +19,7 @@
package org.elasticsearch.search;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -36,6 +37,9 @@ public class SearchShardTarget implements Writeable, Comparable<SearchShardTarge
private final Text nodeId;
private final ShardId shardId;
//original indices are only needed in the coordinating node throughout the search request execution.
//no need to serialize them as part of SearchShardTarget.
private final transient OriginalIndices originalIndices;
public SearchShardTarget(StreamInput in) throws IOException {
if (in.readBoolean()) {
@ -44,15 +48,18 @@ public class SearchShardTarget implements Writeable, Comparable<SearchShardTarge
nodeId = null;
}
shardId = ShardId.readShardId(in);
this.originalIndices = null;
}
public SearchShardTarget(String nodeId, ShardId shardId) {
public SearchShardTarget(String nodeId, ShardId shardId, OriginalIndices originalIndices) {
this.nodeId = nodeId == null ? null : new Text(nodeId);
this.shardId = shardId;
this.originalIndices = originalIndices;
}
//this constructor is only used in tests
public SearchShardTarget(String nodeId, Index index, int shardId) {
this(nodeId, new ShardId(index, shardId));
this(nodeId, new ShardId(index, shardId), OriginalIndices.NONE);
}
@Nullable
@ -72,6 +79,10 @@ public class SearchShardTarget implements Writeable, Comparable<SearchShardTarge
return shardId;
}
public OriginalIndices getOriginalIndices() {
return originalIndices;
}
@Override
public int compareTo(SearchShardTarget o) {
int i = shardId.getIndexName().compareTo(o.getIndex());

View File

@ -23,7 +23,6 @@ import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -42,9 +41,9 @@ public class ShardFetchSearchRequest extends ShardFetchRequest implements Indice
}
public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
public ShardFetchSearchRequest(OriginalIndices originalIndices, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
super(id, list, lastEmittedDoc);
this.originalIndices = new OriginalIndices(request);
this.originalIndices = originalIndices;
}
@Override

View File

@ -53,11 +53,11 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
public ShardSearchTransportRequest(){
}
public ShardSearchTransportRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards,
public ShardSearchTransportRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, int numberOfShards,
AliasFilter aliasFilter, float indexBoost, long nowInMillis) {
this.shardSearchLocalRequest = new ShardSearchLocalRequest(searchRequest, shardId, numberOfShards, aliasFilter, indexBoost,
nowInMillis);
this.originalIndices = new OriginalIndices(searchRequest);
this.originalIndices = originalIndices;
}
@Override
@ -76,7 +76,6 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
return originalIndices.indicesOptions();
}
@Override
public ShardId shardId() {
return shardSearchLocalRequest.shardId();

View File

@ -21,7 +21,6 @@ package org.elasticsearch.search.query;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
@ -47,10 +46,10 @@ public class QuerySearchRequest extends TransportRequest implements IndicesReque
public QuerySearchRequest() {
}
public QuerySearchRequest(SearchRequest request, long id, AggregatedDfs dfs) {
public QuerySearchRequest(OriginalIndices originalIndices, long id, AggregatedDfs dfs) {
this.id = id;
this.dfs = dfs;
this.originalIndices = new OriginalIndices(request);
this.originalIndices = originalIndices;
}
public long id() {

View File

@ -26,6 +26,7 @@ import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterChangedEvent;

View File

@ -21,6 +21,7 @@ package org.elasticsearch;
import org.apache.lucene.util.Constants;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
@ -758,9 +759,9 @@ public class ElasticsearchExceptionTests extends ESTestCase {
failureCause = new NoShardAvailableActionException(new ShardId("_index_g", "_uuid_g", 6), "node_g", failureCause);
ShardSearchFailure[] shardFailures = new ShardSearchFailure[]{
new ShardSearchFailure(new ParsingException(0, 0, "Parsing g", null),
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 61))),
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 61), OriginalIndices.NONE)),
new ShardSearchFailure(new RepositoryException("repository_g", "Repo"),
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 62))),
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 62), OriginalIndices.NONE)),
new ShardSearchFailure(new SearchContextMissingException(0L), null)
};
failure = new SearchPhaseExecutionException("phase_g", "G", failureCause, shardFailures);

View File

@ -19,11 +19,8 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.test.ESTestCase;
@ -56,48 +53,6 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase {
System::nanoTime);
}
final ShardIterator it = new ShardIterator() {
@Override
public ShardId shardId() {
return null;
}
@Override
public void reset() {
}
@Override
public int compareTo(ShardIterator o) {
return 0;
}
@Override
public int size() {
return 0;
}
@Override
public int sizeActive() {
return 0;
}
@Override
public ShardRouting nextOrNull() {
return null;
}
@Override
public int remaining() {
return 0;
}
@Override
public Iterable<ShardRouting> asUnordered() {
return null;
}
};
return new AbstractSearchAsyncAction<SearchPhaseResult>(
"test",
null,
@ -108,7 +63,7 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase {
null,
null,
null,
new GroupShardsIterator(Collections.singletonList(it)),
new GroupShardsIterator<>(Collections.singletonList(new SearchShardIterator(null, Collections.emptyList(), null))),
timeProvider,
0,
null,
@ -123,7 +78,7 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase {
@Override
protected void executePhaseOnShard(
final ShardIterator shardIt,
final SearchShardIterator shardIt,
final ShardRouting shard,
final SearchActionListener<SearchPhaseResult> listener) {
@ -157,5 +112,4 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase {
assertThat(actual, greaterThanOrEqualTo(TimeUnit.NANOSECONDS.toMillis(expected.get())));
}
}
}

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
@ -29,14 +29,11 @@ import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.transport.Transport;
import org.junit.Assert;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -114,7 +111,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
}
@Override
public ShardSearchTransportRequest buildShardSearchRequest(ShardIterator shardIt, ShardRouting shard) {
public ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard) {
Assert.fail("should not be called");
return null;
}
@ -145,7 +142,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
}
@Override
public void sendReleaseSearchContext(long contextId, Transport.Connection connection) {
public void sendReleaseSearchContext(long contextId, Transport.Connection connection, OriginalIndices originalIndices) {
releasedSearchContexts.add(contextId);
}
}

View File

@ -382,7 +382,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
failReference.set(x);
responseLatch.countDown();
});
connection.fetchSearchShards(request, Arrays.asList("test-index"), shardsListener);
connection.fetchSearchShards(request, new String[]{"test-index"}, shardsListener);
responseLatch.await();
assertNull(failReference.get());
assertNotNull(reference.get());

View File

@ -20,10 +20,11 @@ package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
@ -204,7 +205,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
public void testProcessRemoteShards() throws IOException {
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, null)) {
assertFalse(service.isCrossClusterSearchEnabled());
List<ShardIterator> iteratorList = new ArrayList<>();
List<SearchShardIterator> iteratorList = new ArrayList<>();
Map<String, ClusterSearchShardsResponse> searchShardsResponseMap = new HashMap<>();
DiscoveryNode[] nodes = new DiscoveryNode[] {
new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT),
@ -225,11 +226,26 @@ public class RemoteClusterServiceTests extends ESTestCase {
TestShardRouting.newShardRouting("bar", 0, "node1", false, ShardRoutingState.STARTED)})
};
searchShardsResponseMap.put("test_cluster_1", new ClusterSearchShardsResponse(groups, nodes, indicesAndAliases));
DiscoveryNode[] nodes2 = new DiscoveryNode[] {
new DiscoveryNode("node3", buildNewFakeTransportAddress(), Version.CURRENT)
};
ClusterSearchShardsGroup[] groups2 = new ClusterSearchShardsGroup[] {
new ClusterSearchShardsGroup(new ShardId("xyz", "xyz_id", 0),
new ShardRouting[] {TestShardRouting.newShardRouting("xyz", 0, "node3", true, ShardRoutingState.STARTED)})
};
searchShardsResponseMap.put("test_cluster_2", new ClusterSearchShardsResponse(groups2, nodes2, null));
Map<String, OriginalIndices> remoteIndicesByCluster = new HashMap<>();
remoteIndicesByCluster.put("test_cluster_1",
new OriginalIndices(new String[]{"fo*", "ba*"}, IndicesOptions.strictExpandOpenAndForbidClosed()));
remoteIndicesByCluster.put("test_cluster_2",
new OriginalIndices(new String[]{"x*"}, IndicesOptions.strictExpandOpenAndForbidClosed()));
Map<String, AliasFilter> remoteAliases = new HashMap<>();
service.processRemoteShards(searchShardsResponseMap, iteratorList, remoteAliases);
assertEquals(3, iteratorList.size());
for (ShardIterator iterator : iteratorList) {
service.processRemoteShards(searchShardsResponseMap, remoteIndicesByCluster, iteratorList, remoteAliases);
assertEquals(4, iteratorList.size());
for (SearchShardIterator iterator : iteratorList) {
if (iterator.shardId().getIndexName().endsWith("foo")) {
assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices());
assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1);
assertEquals("test_cluster_1:foo", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
@ -239,7 +255,8 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "foo");
assertNull(iterator.nextOrNull());
} else {
} else if (iterator.shardId().getIndexName().endsWith("bar")) {
assertArrayEquals(new String[]{"fo*", "ba*"}, iterator.getOriginalIndices().indices());
assertEquals(0, iterator.shardId().getId());
assertEquals("test_cluster_1:bar", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
@ -249,13 +266,23 @@ public class RemoteClusterServiceTests extends ESTestCase {
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "bar");
assertNull(iterator.nextOrNull());
} else if (iterator.shardId().getIndexName().endsWith("xyz")) {
assertArrayEquals(new String[]{"x*"}, iterator.getOriginalIndices().indices());
assertEquals(0, iterator.shardId().getId());
assertEquals("test_cluster_2:xyz", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "xyz");
assertNull(iterator.nextOrNull());
}
}
assertEquals(2, remoteAliases.size());
assertEquals(3, remoteAliases.size());
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("foo_id"));
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("bar_id"));
assertTrue(remoteAliases.toString(), remoteAliases.containsKey("xyz_id"));
assertEquals(new TermsQueryBuilder("foo", "bar"), remoteAliases.get("foo_id").getQueryBuilder());
assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder());
assertNull(remoteAliases.get("xyz_id").getQueryBuilder());
}
}

View File

@ -20,11 +20,11 @@ package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
@ -76,12 +76,14 @@ public class SearchAsyncActionTests extends ESTestCase {
Map<DiscoveryNode, Set<Long>> nodeToContextMap = new HashMap<>();
AtomicInteger contextIdGenerator = new AtomicInteger(0);
GroupShardsIterator shardsIter = getShardsIter("idx", randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode);
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode);
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(RemoteClusterService.REMOTE_CLUSTERS_SEEDS)), null) {
@Override
public void sendFreeContext(Transport.Connection connection, long contextId, SearchRequest request) {
public void sendFreeContext(Transport.Connection connection, long contextId, OriginalIndices originalIndices) {
numFreedContext.incrementAndGet();
assertTrue(nodeToContextMap.containsKey(connection.getNode()));
assertTrue(nodeToContextMap.get(connection.getNode()).remove(contextId));
@ -110,7 +112,7 @@ public class SearchAsyncActionTests extends ESTestCase {
TestSearchResponse response = new TestSearchResponse();
@Override
protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, SearchActionListener<TestSearchPhaseResult>
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, SearchActionListener<TestSearchPhaseResult>
listener) {
assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId()));
Transport.Connection connection = getConnection(shard.currentNodeId());
@ -133,7 +135,7 @@ public class SearchAsyncActionTests extends ESTestCase {
for (int i = 0; i < results.getNumShards(); i++) {
TestSearchPhaseResult result = results.results.get(i);
assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId());
sendReleaseSearchContext(result.getRequestId(), new MockConnection(result.node));
sendReleaseSearchContext(result.getRequestId(), new MockConnection(result.node), OriginalIndices.NONE);
}
responseListener.onResponse(response);
latch.countDown();
@ -154,9 +156,9 @@ public class SearchAsyncActionTests extends ESTestCase {
}
}
private GroupShardsIterator getShardsIter(String index, int numShards, boolean doReplicas, DiscoveryNode primaryNode,
DiscoveryNode replicaNode) {
ArrayList<ShardIterator> list = new ArrayList<>();
private static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, OriginalIndices originalIndices, int numShards,
boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) {
ArrayList<SearchShardIterator> list = new ArrayList<>();
for (int i = 0; i < numShards; i++) {
ArrayList<ShardRouting> started = new ArrayList<>();
ArrayList<ShardRouting> initializing = new ArrayList<>();
@ -184,9 +186,9 @@ public class SearchAsyncActionTests extends ESTestCase {
}
Collections.shuffle(started, random());
started.addAll(initializing);
list.add(new PlainShardIterator(new ShardId(new Index(index, "_na_"), i), started));
list.add(new SearchShardIterator(new ShardId(new Index(index, "_na_"), i), started, originalIndices));
}
return new GroupShardsIterator(list);
return new GroupShardsIterator<>(list);
}
public static class TestSearchResponse extends SearchResponse {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentParser;
@ -42,7 +43,7 @@ public class ShardSearchFailureTests extends ESTestCase {
String indexUuid = randomAlphaOfLengthBetween(5, 10);
int shardId = randomInt();
return new ShardSearchFailure(ex,
new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), shardId)));
new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), shardId), null));
}
public void testFromXContent() throws IOException {
@ -73,7 +74,7 @@ public class ShardSearchFailureTests extends ESTestCase {
public void testToXContent() throws IOException {
ShardSearchFailure failure = new ShardSearchFailure(new ParsingException(0, 0, "some message", null),
new SearchShardTarget("nodeId", new ShardId(new Index("indexName", "indexUuid"), 123)));
new SearchShardTarget("nodeId", new ShardId(new Index("indexName", "indexUuid"), 123), OriginalIndices.NONE));
BytesReference xContent = toXContent(failure, XContentType.JSON, randomBoolean());
assertEquals(
"{\"shard\":123,"

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
public class TransportSearchActionTests extends ESTestCase {
public void testMergeShardsIterators() throws IOException {
List<ShardIterator> localShardIterators = new ArrayList<>();
{
ShardId shardId = new ShardId("local_index", "local_index_uuid", 0);
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, "local_node", true, STARTED);
ShardIterator shardIterator = new PlainShardIterator(shardId, Collections.singletonList(shardRouting));
localShardIterators.add(shardIterator);
}
{
ShardId shardId2 = new ShardId("local_index_2", "local_index_2_uuid", 1);
ShardRouting shardRouting2 = TestShardRouting.newShardRouting(shardId2, "local_node", true, STARTED);
ShardIterator shardIterator2 = new PlainShardIterator(shardId2, Collections.singletonList(shardRouting2));
localShardIterators.add(shardIterator2);
}
GroupShardsIterator<ShardIterator> localShardsIterator = new GroupShardsIterator<>(localShardIterators);
OriginalIndices localIndices = new OriginalIndices(new String[]{"local_alias", "local_index_2"},
IndicesOptions.strictExpandOpenAndForbidClosed());
OriginalIndices remoteIndices = new OriginalIndices(new String[]{"remote_alias", "remote_index_2"},
IndicesOptions.strictExpandOpen());
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
{
ShardId remoteShardId = new ShardId("remote_index", "remote_index_uuid", 2);
ShardRouting remoteShardRouting = TestShardRouting.newShardRouting(remoteShardId, "remote_node", true, STARTED);
SearchShardIterator remoteShardIterator = new SearchShardIterator(remoteShardId,
Collections.singletonList(remoteShardRouting), remoteIndices);
remoteShardIterators.add(remoteShardIterator);
}
{
ShardId remoteShardId2 = new ShardId("remote_index_2", "remote_index_2_uuid", 3);
ShardRouting remoteShardRouting2 = TestShardRouting.newShardRouting(remoteShardId2, "remote_node", true, STARTED);
SearchShardIterator remoteShardIterator2 = new SearchShardIterator(remoteShardId2,
Collections.singletonList(remoteShardRouting2), remoteIndices);
remoteShardIterators.add(remoteShardIterator2);
}
OriginalIndices remoteIndices2 = new OriginalIndices(new String[]{"remote_index_3"}, IndicesOptions.strictExpand());
{
ShardId remoteShardId3 = new ShardId("remote_index_3", "remote_index_3_uuid", 4);
ShardRouting remoteShardRouting3 = TestShardRouting.newShardRouting(remoteShardId3, "remote_node", true, STARTED);
SearchShardIterator remoteShardIterator3 = new SearchShardIterator(remoteShardId3,
Collections.singletonList(remoteShardRouting3), remoteIndices2);
remoteShardIterators.add(remoteShardIterator3);
}
GroupShardsIterator<SearchShardIterator> searchShardIterators = TransportSearchAction.mergeShardsIterators(localShardsIterator,
localIndices, remoteShardIterators);
assertEquals(searchShardIterators.size(), 5);
int i = 0;
for (SearchShardIterator searchShardIterator : searchShardIterators) {
switch(i++) {
case 0:
assertEquals("local_index", searchShardIterator.shardId().getIndexName());
assertEquals(0, searchShardIterator.shardId().getId());
assertSame(localIndices, searchShardIterator.getOriginalIndices());
break;
case 1:
assertEquals("local_index_2", searchShardIterator.shardId().getIndexName());
assertEquals(1, searchShardIterator.shardId().getId());
assertSame(localIndices, searchShardIterator.getOriginalIndices());
break;
case 2:
assertEquals("remote_index", searchShardIterator.shardId().getIndexName());
assertEquals(2, searchShardIterator.shardId().getId());
assertSame(remoteIndices, searchShardIterator.getOriginalIndices());
break;
case 3:
assertEquals("remote_index_2", searchShardIterator.shardId().getIndexName());
assertEquals(3, searchShardIterator.shardId().getId());
assertSame(remoteIndices, searchShardIterator.getOriginalIndices());
break;
case 4:
assertEquals("remote_index_3", searchShardIterator.shardId().getIndexName());
assertEquals(4, searchShardIterator.shardId().getId());
assertSame(remoteIndices2, searchShardIterator.getOriginalIndices());
break;
}
}
}
}

View File

@ -296,7 +296,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX});
Set<String> set = new HashSet<>();
for (ShardRouting shard : shardIt.asUnordered()) {
for (ShardRouting shard : shardIt) {
set.add(shard.currentNodeId());
}
@ -332,7 +332,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
// the master should not be in the list of nodes that requests were sent to
ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX});
Set<String> set = new HashSet<>();
for (ShardRouting shard : shardIt.asUnordered()) {
for (ShardRouting shard : shardIt) {
if (!shard.currentNodeId().equals(masterNode.getId())) {
set.add(shard.currentNodeId());
}
@ -352,8 +352,8 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
public void testOperationExecution() throws Exception {
ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX});
Set<ShardRouting> shards = new HashSet<>();
String nodeId = shardIt.asUnordered().iterator().next().currentNodeId();
for (ShardRouting shard : shardIt.asUnordered()) {
String nodeId = shardIt.iterator().next().currentNodeId();
for (ShardRouting shard : shardIt) {
if (nodeId.equals(shard.currentNodeId())) {
shards.add(shard);
}
@ -417,7 +417,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
ShardsIterator shardIt = clusterService.state().getRoutingTable().allShards(new String[]{TEST_INDEX});
Map<String, List<ShardRouting>> map = new HashMap<>();
for (ShardRouting shard : shardIt.asUnordered()) {
for (ShardRouting shard : shardIt) {
if (!map.containsKey(shard.currentNodeId())) {
map.put(shard.currentNodeId(), new ArrayList<>());
}

View File

@ -165,10 +165,9 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa
private List<ShardStateAction.ShardEntry> createExistingShards(ClusterState currentState, String reason) {
List<ShardRouting> shards = new ArrayList<>();
GroupShardsIterator shardGroups =
currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true);
GroupShardsIterator<ShardIterator> shardGroups = currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true);
for (ShardIterator shardIt : shardGroups) {
for (ShardRouting shard : shardIt.asUnordered()) {
for (ShardRouting shard : shardIt) {
shards.add(shard);
}
}

View File

@ -43,7 +43,7 @@ public class GroupShardsIteratorTests extends ESTestCase {
list.add(new PlainShardIterator(new ShardId(index, 0), Arrays.asList(newRouting(index, 0, true))));
list.add(new PlainShardIterator(new ShardId(index, 1), Arrays.asList(newRouting(index, 1, true))));
GroupShardsIterator iter = new GroupShardsIterator(list);
GroupShardsIterator iter = new GroupShardsIterator<>(list);
assertEquals(7, iter.totalSizeWith1ForEmpty());
assertEquals(5, iter.size());
assertEquals(6, iter.totalSize());
@ -67,7 +67,7 @@ public class GroupShardsIteratorTests extends ESTestCase {
Collections.shuffle(list, random());
ArrayList<ShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator iter = new GroupShardsIterator(list);
GroupShardsIterator<ShardIterator> iter = new GroupShardsIterator<>(list);
for (ShardIterator shardsIterator : iter) {
actualIterators.add(shardsIterator);
}

View File

@ -376,7 +376,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
OperationRouting operationRouting = new OperationRouting(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
GroupShardsIterator shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_shards:0");
GroupShardsIterator<ShardIterator> shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_shards:0");
assertThat(shardIterators.size(), equalTo(1));
assertThat(shardIterators.iterator().next().shardId().id(), equalTo(0));
@ -443,7 +443,7 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
// When replicas haven't initialized, it comes back with the primary first, then initializing replicas
GroupShardsIterator shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first");
GroupShardsIterator<ShardIterator> shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first");
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
ShardIterator iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard

View File

@ -35,7 +35,6 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -52,7 +51,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergePolicyConfig;
@ -73,9 +71,6 @@ import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
@ -292,7 +287,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
}
assertThat(response.getStatus(), is(ClusterHealthStatus.RED));
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
GroupShardsIterator<ShardIterator> shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
for (ShardIterator iterator : shardIterators) {
ShardRouting routing;
while ((routing = iterator.nextOrNull()) != null) {

View File

@ -150,10 +150,10 @@ public class SuggestStatsIT extends ESIntegTestCase {
private Set<String> nodeIdsWithIndex(String... indices) {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
GroupShardsIterator allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
GroupShardsIterator<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
Set<String> nodes = new HashSet<>();
for (ShardIterator shardIterator : allAssignedShardsGrouped) {
for (ShardRouting routing : shardIterator.asUnordered()) {
for (ShardRouting routing : shardIterator) {
if (routing.active()) {
nodes.add(routing.currentNodeId());
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search;
import org.apache.lucene.search.Explanation;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -128,7 +129,8 @@ public class SearchHitTests extends ESTestCase {
}
if (randomBoolean()) {
hit.shard(new SearchShardTarget(randomAlphaOfLengthBetween(5, 10),
new ShardId(new Index(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)), randomInt())));
new ShardId(new Index(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)), randomInt()),
OriginalIndices.NONE));
}
return hit;
}

View File

@ -184,8 +184,8 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
for (int i = 0; i < rounds; i++) {
try {
SearchPhaseResult searchPhaseResult = service.executeQueryPhase(
new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f),
new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f),
new SearchTask(123L, "", "", "", null));
IntArrayList intCursors = new IntArrayList(1);
intCursors.add(0);
@ -213,16 +213,16 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
final IndexShard indexShard = indexService.getShard(0);
final SearchContext contextWithDefaultTimeout = service.createContext(
new ShardSearchLocalRequest(
indexShard.shardId(),
1,
SearchType.DEFAULT,
new SearchSourceBuilder(),
new String[0],
false,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f),
null);
new ShardSearchLocalRequest(
indexShard.shardId(),
1,
SearchType.DEFAULT,
new SearchSourceBuilder(),
new String[0],
false,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f),
null);
try {
// the search context should inherit the default timeout
assertThat(contextWithDefaultTimeout.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
@ -233,15 +233,15 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
final long seconds = randomIntBetween(6, 10);
final SearchContext context = service.createContext(
new ShardSearchLocalRequest(
indexShard.shardId(),
1,
SearchType.DEFAULT,
new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds)),
new String[0],
false,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f),
new ShardSearchLocalRequest(
indexShard.shardId(),
1,
SearchType.DEFAULT,
new SearchSourceBuilder().timeout(TimeValue.timeValueSeconds(seconds)),
new String[0],
false,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f),
null);
try {
// the search context should inherit the query timeout

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search.internal;
import org.elasticsearch.Version;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -95,7 +96,7 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
} else {
filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY);
}
return new ShardSearchTransportRequest(searchRequest, shardId,
return new ShardSearchTransportRequest(new OriginalIndices(searchRequest), searchRequest, shardId,
randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), Math.abs(randomLong()));
}

View File

@ -165,10 +165,10 @@ public class SearchStatsIT extends ESIntegTestCase {
private Set<String> nodeIdsWithIndex(String... indices) {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
GroupShardsIterator allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
GroupShardsIterator<ShardIterator> allAssignedShardsGrouped = state.routingTable().allAssignedShardsGrouped(indices, true);
Set<String> nodes = new HashSet<>();
for (ShardIterator shardIterator : allAssignedShardsGrouped) {
for (ShardRouting routing : shardIterator.asUnordered()) {
for (ShardRouting routing : shardIterator) {
if (routing.active()) {
nodes.add(routing.currentNodeId());
}