Preserve cluster alias throughout search execution to lookup nodes by cluster and ID (#24438)

today we only lookup nodes by their ID but never by the (clusterAlias, nodeId) tuple.
This could in theory lead to lookups on the wrong cluster if there are 2 clusters
with a node that has the same ID. It's very unlikely to happen but we now can clearly
disambiguate between clusters and their nodes.
This commit is contained in:
Simon Willnauer 2017-05-03 12:03:30 +02:00 committed by GitHub
parent 194742b3f4
commit c99cc8a896
26 changed files with 166 additions and 207 deletions

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
@ -44,7 +43,7 @@ import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends InitialSearchPhase<Result>
@ -58,7 +57,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
/**
* Used by subclasses to resolve node ids to DiscoveryNodes.
**/
private final Function<String, Transport.Connection> nodeIdToConnection;
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
private final SearchPhaseResults<Result> results;
private final long clusterStateVersion;
@ -71,7 +70,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
Function<String, Transport.Connection> nodeIdToConnection,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
@ -210,7 +209,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
results.getSuccessfulResults().forEach((entry) -> {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = nodeIdToConnection.apply(searchShardTarget.getNodeId());
Transport.Connection connection = getConnection(null, searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getRequestId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
@ -273,8 +272,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
}
@Override
public final Transport.Connection getConnection(String nodeId) {
return nodeIdToConnection.apply(nodeId);
public final Transport.Connection getConnection(String clusterAlias, String nodeId) {
return nodeIdToConnection.apply(clusterAlias, nodeId);
}
@Override
@ -297,10 +296,10 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
listener.onFailure(e);
}
public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard) {
AliasFilter filter = aliasFilter.get(shard.index().getUUID());
public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) {
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST);
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis());
}

View File

@ -72,7 +72,7 @@ final class DfsQueryPhase extends SearchPhase {
() -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context);
for (final DfsSearchResult dfsResult : resultList) {
final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId());
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(searchShardTarget.getOriginalIndices(),
dfsResult.getRequestId(), dfs);
final int shardIndex = dfsResult.getShardIndex();

View File

@ -136,7 +136,8 @@ final class FetchSearchPhase extends SearchPhase {
counter.countDown();
} else {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId());
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),
searchShardTarget.getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
@ -191,7 +192,7 @@ final class FetchSearchPhase extends SearchPhase {
if (context.getRequest().scroll() == null && queryResult.hasSearchContext()) {
try {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getNodeId());
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
context.sendReleaseSearchContext(queryResult.getRequestId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception e) {
context.getLogger().trace("failed to release context", e);

View File

@ -67,7 +67,8 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getOriginalIndices());
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getClusterAlias(),
shardIt.getOriginalIndices());
onShardFailure(shardIndex, shardTarget, e);
if (totalOps.incrementAndGet() == expectedTotalOps) {
@ -144,7 +145,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
} else {
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getOriginalIndices()), shardIndex) {
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
onShardResult(result, shardIt);

View File

@ -29,62 +29,33 @@ import org.elasticsearch.transport.Transport;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.BiFunction;
final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
private final SearchPhaseController searchPhaseController;
SearchDfsQueryThenFetchAsyncAction(
final Logger logger,
final SearchTransportService searchTransportService,
final Function<String, Transport.Connection> nodeIdToConnection,
final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts,
final SearchPhaseController searchPhaseController,
final Executor executor,
final SearchRequest request,
final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts,
final TransportSearchAction.SearchTimeProvider timeProvider,
final long clusterStateVersion,
final SearchTask task) {
super(
"dfs",
logger,
searchTransportService,
nodeIdToConnection,
aliasFilter,
concreteIndexBoosts,
executor,
request,
listener,
shardsIts,
timeProvider,
clusterStateVersion,
task,
new SearchPhaseResults<>(shardsIts.size()));
SearchDfsQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection, final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
final long clusterStateVersion, final SearchTask task) {
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, new SearchPhaseResults<>(shardsIts.size()));
this.searchPhaseController = searchPhaseController;
}
@Override
protected void executePhaseOnShard(
final SearchShardIterator shardIt,
final ShardRouting shard,
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
final SearchActionListener<DfsSearchResult> listener) {
getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()),
buildShardSearchRequest(shardIt, shard) , getTask(), listener);
getSearchTransport().sendExecuteDfs(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt) , getTask(), listener);
}
@Override
protected SearchPhase getNextPhase(
final SearchPhaseResults<DfsSearchResult> results, final SearchPhaseContext context) {
return new DfsQueryPhase(
results.results,
searchPhaseController,
(queryResults) ->
new FetchSearchPhase(queryResults, searchPhaseController, context),
context);
protected SearchPhase getNextPhase(final SearchPhaseResults<DfsSearchResult> results, final SearchPhaseContext context) {
return new DfsQueryPhase(results.results, searchPhaseController, (queryResults) ->
new FetchSearchPhase(queryResults, searchPhaseController, context), context);
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchResponse;
@ -84,7 +83,7 @@ interface SearchPhaseContext extends ActionListener<SearchResponse>, Executor {
* Returns a connection to the node if connected otherwise and {@link org.elasticsearch.transport.ConnectTransportException} will be
* thrown.
*/
Transport.Connection getConnection(String nodeId);
Transport.Connection getConnection(String clusterAlias, String nodeId);
/**
* Returns the {@link SearchTransportService} to send shard request to other nodes
@ -106,7 +105,7 @@ interface SearchPhaseContext extends ActionListener<SearchResponse>, Executor {
/**
* Builds an request for the initial search phase.
*/
ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard);
ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt);
/**
* Processes the phase transition from on phase to another. This method handles all errors that happen during the initial run execution

View File

@ -29,59 +29,31 @@ import org.elasticsearch.transport.Transport;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.BiFunction;
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPhaseResult> {
private final SearchPhaseController searchPhaseController;
SearchQueryThenFetchAsyncAction(
final Logger logger,
final SearchTransportService searchTransportService,
final Function<String, Transport.Connection> nodeIdToConnection,
final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts,
final SearchPhaseController searchPhaseController,
final Executor executor,
final SearchRequest request,
final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts,
final TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion,
SearchTask task) {
super(
"query",
logger,
searchTransportService,
nodeIdToConnection,
aliasFilter,
concreteIndexBoosts,
executor,
request,
listener,
shardsIts,
timeProvider,
clusterStateVersion,
task,
searchPhaseController.newSearchPhaseResults(request, shardsIts.size()));
SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection, final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion, SearchTask task) {
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()));
this.searchPhaseController = searchPhaseController;
}
protected void executePhaseOnShard(
final SearchShardIterator shardIt,
final ShardRouting shard,
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
final SearchActionListener<SearchPhaseResult> listener) {
getSearchTransport().sendExecuteQuery(
getConnection(shard.currentNodeId()),
buildShardSearchRequest(shardIt, shard),
getTask(),
listener);
getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}
@Override
protected SearchPhase getNextPhase(
final SearchPhaseResults<SearchPhaseResult> results,
final SearchPhaseContext context) {
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
return new FetchSearchPhase(results, searchPhaseController, context);
}
}

View File

@ -33,6 +33,7 @@ import java.util.List;
public final class SearchShardIterator extends PlainShardIterator {
private final OriginalIndices originalIndices;
private String clusterAlias;
/**
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
@ -41,9 +42,10 @@ public final class SearchShardIterator extends PlainShardIterator {
* @param shardId shard id of the group
* @param shards shards to iterate
*/
public SearchShardIterator(ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
public SearchShardIterator(String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
super(shardId, shards);
this.originalIndices = originalIndices;
this.clusterAlias = clusterAlias;
}
/**
@ -52,4 +54,8 @@ public final class SearchShardIterator extends PlainShardIterator {
public OriginalIndices getOriginalIndices() {
return originalIndices;
}
public String getClusterAlias() {
return clusterAlias;
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
@ -79,7 +78,7 @@ public class SearchTransportService extends AbstractComponent {
private final TransportService transportService;
public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) {
public SearchTransportService(Settings settings, TransportService transportService) {
super(settings);
this.transportService = transportService;
}
@ -390,7 +389,18 @@ public class SearchTransportService extends AbstractComponent {
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
}
Transport.Connection getConnection(DiscoveryNode node) {
/**
* Returns a connection to the given node on the provided cluster. If the cluster alias is <code>null</code> the node will be resolved
* against the local cluster.
* @param clusterAlias the cluster alias the node should be resolve against
* @param node the node to resolve
* @return a connection to the given node belonging to the cluster with the provided alias.
*/
Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
if (clusterAlias == null) {
return transportService.getConnection(node);
} else {
return transportService.getRemoteClusterService().getConnection(node, clusterAlias);
}
}
}

View File

@ -214,7 +214,7 @@ public class ShardSearchFailure implements ShardOperationFailedException {
}
return new ShardSearchFailure(exception,
new SearchShardTarget(nodeId,
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), OriginalIndices.NONE));
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), null, OriginalIndices.NONE));
}
@Override

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.search;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
@ -59,7 +58,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
@ -204,31 +203,32 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
(nodeId) -> null, clusterState, Collections.emptyMap(), listener);
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener);
} else {
remoteClusterService.collectSearchShards(searchRequest, remoteClusterIndices,
ActionListener.wrap((searchShardsResponses) -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
Function<String, Transport.Connection> connectionFunction = processRemoteShards(remoteClusterService,
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators,
connectionFunction, clusterState, remoteAliasFilters, listener);
clusterNodeLookup, clusterState, remoteAliasFilters, listener);
}, listener::onFailure));
}
}
static Function<String, Transport.Connection> processRemoteShards(RemoteClusterService remoteClusterService,
Map<String, ClusterSearchShardsResponse> searchShardsResponses,
static BiFunction<String, String, DiscoveryNode> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,
Map<String, OriginalIndices> remoteIndicesByCluster,
List<SearchShardIterator> remoteShardIterators,
Map<String, AliasFilter> aliasFilterMap) {
Map<String, Supplier<Transport.Connection>> nodeToCluster = new HashMap<>();
Map<String, Map<String, DiscoveryNode>> clusterToNode = new HashMap<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
String clusterAlias = entry.getKey();
ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
HashMap<String, DiscoveryNode> idToDiscoveryNode = new HashMap<>();
clusterToNode.put(clusterAlias, idToDiscoveryNode);
for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
nodeToCluster.put(remoteNode.getId(), () -> remoteClusterService.getConnection(remoteNode, clusterAlias));
idToDiscoveryNode.put(remoteNode.getId(), remoteNode);
}
Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
@ -240,7 +240,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
remoteIndex.getUUID());
OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
assert originalIndices != null;
SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()),
SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, new ShardId(index, shardId.getId()),
Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices);
remoteShardIterators.add(shardIterator);
AliasFilter aliasFilter;
@ -254,17 +254,17 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter);
}
}
return (nodeId) -> {
Supplier<Transport.Connection> supplier = nodeToCluster.get(nodeId);
if (supplier == null) {
throw new IllegalArgumentException("unknown remote node: " + nodeId);
return (clusterAlias, nodeId) -> {
Map<String, DiscoveryNode> clusterNodes = clusterToNode.get(clusterAlias);
if (clusterNodes == null) {
throw new IllegalArgumentException("unknown remote cluster: " + clusterAlias);
}
return supplier.get();
return clusterNodes.get(nodeId);
};
}
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
List<SearchShardIterator> remoteShardIterators, Function<String, Transport.Connection> remoteConnections,
List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections,
ClusterState clusterState, Map<String, AliasFilter> remoteAliasMap,
ActionListener<SearchResponse> listener) {
@ -312,18 +312,12 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
final DiscoveryNodes nodes = clusterState.nodes();
Function<String, Transport.Connection> connectionLookup = (nodeId) -> {
final DiscoveryNode discoveryNode = nodes.get(nodeId);
final Transport.Connection connection;
if (discoveryNode != null) {
connection = searchTransportService.getConnection(discoveryNode);
} else {
connection = remoteConnections.apply(nodeId);
}
if (connection == null) {
BiFunction<String, String, Transport.Connection> connectionLookup = (clusterName, nodeId) -> {
final DiscoveryNode discoveryNode = clusterName == null ? nodes.get(nodeId) : remoteConnections.apply(clusterName, nodeId);
if (discoveryNode == null) {
throw new IllegalStateException("no node found for id: " + nodeId);
}
return connection;
return searchTransportService.getConnection(clusterName, discoveryNode);
};
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
@ -338,7 +332,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
shards.add(shardIterator);
}
for (ShardIterator shardIterator : localShardsIterator) {
shards.add(new SearchShardIterator(shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
}
return new GroupShardsIterator<>(shards);
}
@ -351,7 +345,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider,
Function<String, Transport.Connection> connectionLookup,
BiFunction<String, String, Transport.Connection> connectionLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
ActionListener<SearchResponse> listener) {

View File

@ -933,5 +933,4 @@ public abstract class StreamInput extends InputStream {
* be a no-op depending on the underlying implementation if the information of the remaining bytes is not present.
*/
protected abstract void ensureCanReadBytes(int length) throws EOFException;
}

View File

@ -412,7 +412,7 @@ public class Node implements Closeable {
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
final SearchTransportService searchTransportService = new SearchTransportService(settings,
settingsModule.getClusterSettings(), transportService);
transportService);
final Consumer<Binder> httpBind;
final HttpServerTransport httpServerTransport;
if (networkModule.isHttpEnabled()) {

View File

@ -545,7 +545,7 @@ public final class SearchHit implements Streamable, ToXContentObject, Iterable<S
ShardId shardId = get(Fields._SHARD, values, null);
String nodeId = get(Fields._NODE, values, null);
if (shardId != null && nodeId != null) {
searchHit.shard(new SearchShardTarget(nodeId, shardId, OriginalIndices.NONE));
searchHit.shard(new SearchShardTarget(nodeId, shardId, null, OriginalIndices.NONE));
}
searchHit.fields(fields);
return searchHit;

View File

@ -500,7 +500,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
indexShard.shardId(), OriginalIndices.NONE);
indexShard.shardId(), null, OriginalIndices.NONE);
Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,

View File

@ -33,13 +33,14 @@ import java.io.IOException;
/**
* The target that the search request was executed on.
*/
public class SearchShardTarget implements Writeable, Comparable<SearchShardTarget> {
public final class SearchShardTarget implements Writeable, Comparable<SearchShardTarget> {
private final Text nodeId;
private final ShardId shardId;
//original indices are only needed in the coordinating node throughout the search request execution.
//original indices and cluster alias are only needed in the coordinating node throughout the search request execution.
//no need to serialize them as part of SearchShardTarget.
private final transient OriginalIndices originalIndices;
private final transient String clusterAlias;
public SearchShardTarget(StreamInput in) throws IOException {
if (in.readBoolean()) {
@ -49,17 +50,19 @@ public class SearchShardTarget implements Writeable, Comparable<SearchShardTarge
}
shardId = ShardId.readShardId(in);
this.originalIndices = null;
this.clusterAlias = null;
}
public SearchShardTarget(String nodeId, ShardId shardId, OriginalIndices originalIndices) {
public SearchShardTarget(String nodeId, ShardId shardId, String clusterAlias, OriginalIndices originalIndices) {
this.nodeId = nodeId == null ? null : new Text(nodeId);
this.shardId = shardId;
this.originalIndices = originalIndices;
this.clusterAlias = clusterAlias;
}
//this constructor is only used in tests
public SearchShardTarget(String nodeId, Index index, int shardId) {
this(nodeId, new ShardId(index, shardId), OriginalIndices.NONE);
this(nodeId, new ShardId(index, shardId), null, OriginalIndices.NONE);
}
@Nullable
@ -83,6 +86,10 @@ public class SearchShardTarget implements Writeable, Comparable<SearchShardTarge
return originalIndices;
}
public String getClusterAlias() {
return clusterAlias;
}
@Override
public int compareTo(SearchShardTarget o) {
int i = shardId.getIndexName().compareTo(o.getIndex());

View File

@ -759,10 +759,10 @@ public class ElasticsearchExceptionTests extends ESTestCase {
failureCause = new NoShardAvailableActionException(new ShardId("_index_g", "_uuid_g", 6), "node_g", failureCause);
ShardSearchFailure[] shardFailures = new ShardSearchFailure[]{
new ShardSearchFailure(new ParsingException(0, 0, "Parsing g", null),
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 61), OriginalIndices.NONE)),
new ShardSearchFailure(new RepositoryException("repository_g", "Repo"),
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 62), OriginalIndices.NONE)),
new ShardSearchFailure(new SearchContextMissingException(0L), null)
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 61), null,
OriginalIndices.NONE)), new ShardSearchFailure(new RepositoryException("repository_g", "Repo"),
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 62), null,
OriginalIndices.NONE)), new ShardSearchFailure(new SearchContextMissingException(0L), null)
};
failure = new SearchPhaseExecutionException("phase_g", "G", failureCause, shardFailures);

View File

@ -19,9 +19,16 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
@ -31,7 +38,7 @@ import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class AbstractSearchAsyncActionTookTests extends ESTestCase {
public class AbstractSearchAsyncActionTests extends ESTestCase {
private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
final boolean controlled,
@ -53,35 +60,19 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase {
System::nanoTime);
}
return new AbstractSearchAsyncAction<SearchPhaseResult>(
"test",
null,
null,
null,
null,
null,
null,
null,
null,
new GroupShardsIterator<>(Collections.singletonList(new SearchShardIterator(null, Collections.emptyList(), null))),
timeProvider,
0,
null,
null
) {
return new AbstractSearchAsyncAction<SearchPhaseResult>("test", null, null, null,
Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null,
new SearchRequest(), null, new GroupShardsIterator<>(Collections.singletonList(
new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null,
new InitialSearchPhase.SearchPhaseResults<>(10)) {
@Override
protected SearchPhase getNextPhase(
final SearchPhaseResults<SearchPhaseResult> results,
final SearchPhaseContext context) {
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
return null;
}
@Override
protected void executePhaseOnShard(
final SearchShardIterator shardIt,
final ShardRouting shard,
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
final SearchActionListener<SearchPhaseResult> listener) {
}
@Override
@ -112,4 +103,16 @@ public class AbstractSearchAsyncActionTookTests extends ESTestCase {
assertThat(actual, greaterThanOrEqualTo(TimeUnit.NANOSECONDS.toMillis(expected.get())));
}
}
public void testBuildShardSearchTransportRequest() {
final AtomicLong expected = new AtomicLong();
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(false, expected);
SearchShardIterator iterator = new SearchShardIterator("test-cluster", new ShardId(new Index("name", "foo"), 1),
Collections.emptyList(), new OriginalIndices(new String[] {"name", "name1"}, IndicesOptions.strictExpand()));
ShardSearchTransportRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator);
assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions());
assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices());
assertEquals(new MatchAllQueryBuilder(), shardSearchTransportRequest.filteringAliases());
assertEquals(2.0f, shardSearchTransportRequest.indexBoost(), 0.0f);
}
}

View File

@ -23,7 +23,6 @@ import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray;
@ -59,7 +58,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
@ -114,7 +113,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
@ -169,7 +168,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,

View File

@ -57,7 +57,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
.collapse(new CollapseBuilder("someField").setInnerHits(new InnerHitBuilder().setName("foobarbaz"))));
mockSearchPhaseContext.getRequest().source().query(originalQuery);
mockSearchPhaseContext.searchTransport = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
@ -126,7 +126,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
mockSearchPhaseContext.getRequest().source(new SearchSourceBuilder()
.collapse(new CollapseBuilder("someField").setInnerHits(new InnerHitBuilder().setName("foobarbaz"))));
mockSearchPhaseContext.searchTransport = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {
@ -168,7 +168,7 @@ public class ExpandSearchPhaseTests extends ESTestCase {
public void testSkipPhase() throws IOException {
MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1);
mockSearchPhaseContext.searchTransport = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionListener<MultiSearchResponse> listener) {

View File

@ -103,7 +103,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -157,7 +157,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -210,7 +210,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
results.consumeResult(queryResult);
}
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -271,7 +271,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
results.consumeResult(queryResult);
AtomicInteger numFetches = new AtomicInteger(0);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
@ -324,7 +324,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
results.consumeResult(queryResult);
SearchTransportService searchTransportService = new SearchTransportService(
Settings.builder().put("search.remote.connect", false).build(), null, null) {
Settings.builder().put("search.remote.connect", false).build(), null) {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {

View File

@ -20,7 +20,6 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.SearchShardTarget;
@ -100,7 +99,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
}
@Override
public Transport.Connection getConnection(String nodeId) {
public Transport.Connection getConnection(String clusterAlias, String nodeId) {
return null; // null is ok here for this test
}
@ -111,7 +110,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
}
@Override
public ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt, ShardRouting shard) {
public ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) {
Assert.fail("should not be called");
return null;
}

View File

@ -30,14 +30,12 @@ import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -81,8 +79,7 @@ public class SearchAsyncActionTests extends ESTestCase {
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode);
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
Collections.singleton(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS)), null) {
SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null) {
@Override
public void sendFreeContext(Transport.Connection connection, long contextId, OriginalIndices originalIndices) {
numFreedContext.incrementAndGet();
@ -99,7 +96,9 @@ public class SearchAsyncActionTests extends ESTestCase {
"test",
logger,
transportService,
lookup::get,
(cluster, node) -> {
assert cluster == null : "cluster was not null: " + cluster;
return lookup.get(node); },
aliasFilters,
Collections.emptyMap(),
null,
@ -116,7 +115,7 @@ public class SearchAsyncActionTests extends ESTestCase {
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, SearchActionListener<TestSearchPhaseResult>
listener) {
assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId()));
Transport.Connection connection = getConnection(shard.currentNodeId());
Transport.Connection connection = getConnection(null, shard.currentNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
connection.getNode());
Set<Long> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> new HashSet<>());
@ -187,7 +186,7 @@ public class SearchAsyncActionTests extends ESTestCase {
}
Collections.shuffle(started, random());
started.addAll(initializing);
list.add(new SearchShardIterator(new ShardId(new Index(index, "_na_"), i), started, originalIndices));
list.add(new SearchShardIterator(null, new ShardId(new Index(index, "_na_"), i), started, originalIndices));
}
return new GroupShardsIterator<>(list);
}

View File

@ -43,7 +43,7 @@ public class ShardSearchFailureTests extends ESTestCase {
String indexUuid = randomAlphaOfLengthBetween(5, 10);
int shardId = randomInt();
return new ShardSearchFailure(ex,
new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), shardId), null));
new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), shardId), null, null));
}
public void testFromXContent() throws IOException {
@ -74,7 +74,7 @@ public class ShardSearchFailureTests extends ESTestCase {
public void testToXContent() throws IOException {
ShardSearchFailure failure = new ShardSearchFailure(new ParsingException(0, 0, "some message", null),
new SearchShardTarget("nodeId", new ShardId(new Index("indexName", "indexUuid"), 123), OriginalIndices.NONE));
new SearchShardTarget("nodeId", new ShardId(new Index("indexName", "indexUuid"), 123), null, OriginalIndices.NONE));
BytesReference xContent = toXContent(failure, XContentType.JSON, randomBoolean());
assertEquals(
"{\"shard\":123,"

View File

@ -90,14 +90,14 @@ public class TransportSearchActionTests extends ESTestCase {
{
ShardId remoteShardId = new ShardId("remote_index", "remote_index_uuid", 2);
ShardRouting remoteShardRouting = TestShardRouting.newShardRouting(remoteShardId, "remote_node", true, STARTED);
SearchShardIterator remoteShardIterator = new SearchShardIterator(remoteShardId,
SearchShardIterator remoteShardIterator = new SearchShardIterator("remote", remoteShardId,
Collections.singletonList(remoteShardRouting), remoteIndices);
remoteShardIterators.add(remoteShardIterator);
}
{
ShardId remoteShardId2 = new ShardId("remote_index_2", "remote_index_2_uuid", 3);
ShardRouting remoteShardRouting2 = TestShardRouting.newShardRouting(remoteShardId2, "remote_node", true, STARTED);
SearchShardIterator remoteShardIterator2 = new SearchShardIterator(remoteShardId2,
SearchShardIterator remoteShardIterator2 = new SearchShardIterator("remote", remoteShardId2,
Collections.singletonList(remoteShardRouting2), remoteIndices);
remoteShardIterators.add(remoteShardIterator2);
}
@ -106,7 +106,7 @@ public class TransportSearchActionTests extends ESTestCase {
{
ShardId remoteShardId3 = new ShardId("remote_index_3", "remote_index_3_uuid", 4);
ShardRouting remoteShardRouting3 = TestShardRouting.newShardRouting(remoteShardId3, "remote_node", true, STARTED);
SearchShardIterator remoteShardIterator3 = new SearchShardIterator(remoteShardId3,
SearchShardIterator remoteShardIterator3 = new SearchShardIterator("remote", remoteShardId3,
Collections.singletonList(remoteShardRouting3), remoteIndices2);
remoteShardIterators.add(remoteShardIterator3);
}
@ -188,7 +188,7 @@ public class TransportSearchActionTests extends ESTestCase {
remoteIndicesByCluster.put("test_cluster_2",
new OriginalIndices(new String[]{"x*"}, IndicesOptions.strictExpandOpenAndForbidClosed()));
Map<String, AliasFilter> remoteAliases = new HashMap<>();
TransportSearchAction.processRemoteShards(service, searchShardsResponseMap, remoteIndicesByCluster, iteratorList,
TransportSearchAction.processRemoteShards(searchShardsResponseMap, remoteIndicesByCluster, iteratorList,
remoteAliases);
assertEquals(4, iteratorList.size());
for (SearchShardIterator iterator : iteratorList) {

View File

@ -129,7 +129,7 @@ public class SearchHitTests extends ESTestCase {
}
if (randomBoolean()) {
hit.shard(new SearchShardTarget(randomAlphaOfLengthBetween(5, 10),
new ShardId(new Index(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)), randomInt()),
new ShardId(new Index(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(5, 10)), randomInt()), null,
OriginalIndices.NONE));
}
return hit;