Harden search context id (#53143)

Using a Long alone is not strong enough for the id of search contexts
because we reset the id generator whenever a data node is restarted.
This can lead to two issues:

1. Fetch phase can fetch documents from another index
2. A scroll search can return documents from another index

This commit avoids these issues by adding a UUID to SearchContexId.
This commit is contained in:
Nhat Nguyen 2020-03-09 11:59:30 -04:00
parent 8c4c19d310
commit 6665ebe7ab
50 changed files with 721 additions and 391 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
@ -77,7 +78,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
private final SearchPhaseResults<Result> results;
private final long clusterStateVersion;
private final ClusterState clusterState;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
private final Map<String, Set<String>> indexRoutings;
@ -103,7 +104,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTimeProvider timeProvider, ClusterState clusterState,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters) {
super(name);
@ -134,7 +135,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.task = task;
this.listener = listener;
this.nodeIdToConnection = nodeIdToConnection;
this.clusterStateVersion = clusterStateVersion;
this.clusterState = clusterState;
this.concreteIndexBoosts = concreteIndexBoosts;
this.aliasFilter = aliasFilter;
this.indexRoutings = indexRoutings;
@ -338,7 +339,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
final String resultsFrom = results.getSuccessfulResults()
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterState.version());
}
executePhase(nextPhase);
}
@ -559,7 +560,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getRequestId(), connection, searchShardTarget.getOriginalIndices());
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
@ -681,4 +682,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
return toExecute;
}
}
protected ClusterState clusterState() {
return clusterState;
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchService.CanMatchResponse;
@ -66,12 +67,12 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
Map<String, Set<String>> indexRoutings,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState,
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters) {
//We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
executor, request, listener, shardsIts, timeProvider, clusterState, task,
new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;

View File

@ -111,7 +111,7 @@ final class ClearScrollController implements Runnable {
} else {
try {
Transport.Connection connection = searchTransportService.getConnection(target.getClusterAlias(), node);
searchTransportService.sendFreeContext(connection, target.getScrollId(),
searchTransportService.sendFreeContext(connection, target.getContextId(),
ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), e -> onFailedFreedContext(e, node)));
} catch (Exception e) {
onFailedFreedContext(e, node);

View File

@ -78,7 +78,7 @@ final class DfsQueryPhase extends SearchPhase {
final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(searchShardTarget.getOriginalIndices(),
dfsResult.getRequestId(), dfs);
dfsResult.getContextId(), dfs);
final int shardIndex = dfsResult.getShardIndex();
searchTransportService.sendExecuteQuery(connection, querySearchRequest, context.getTask(),
new SearchActionListener<QuerySearchResult>(searchShardTarget, shardIndex) {
@ -96,14 +96,15 @@ final class DfsQueryPhase extends SearchPhase {
public void onFailure(Exception exception) {
try {
context.getLogger().debug(() -> new ParameterizedMessage("[{}] Failed to execute query phase",
querySearchRequest.id()), exception);
querySearchRequest.contextId()), exception);
progressListener.notifyQueryFailure(shardIndex, exception);
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
context.sendReleaseSearchContext(querySearchRequest.id(), connection, searchShardTarget.getOriginalIndices());
context.sendReleaseSearchContext(
querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices());
}
}
});

View File

@ -22,7 +22,9 @@ import com.carrotsearch.hppc.IntArrayList;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.Version;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
@ -30,6 +32,7 @@ import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.Transport;
@ -50,17 +53,21 @@ final class FetchSearchPhase extends SearchPhase {
private final Logger logger;
private final SearchPhaseResults<SearchPhaseResult> resultConsumer;
private final SearchProgressListener progressListener;
private final ClusterState clusterState;
FetchSearchPhase(SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController,
SearchPhaseContext context) {
this(resultConsumer, searchPhaseController, context,
SearchPhaseContext context,
ClusterState clusterState) {
this(resultConsumer, searchPhaseController, context, clusterState,
(response, scrollId) -> new ExpandSearchPhase(context, response, scrollId));
}
FetchSearchPhase(SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController,
SearchPhaseContext context, BiFunction<InternalSearchResponse, String, SearchPhase> nextPhaseFactory) {
SearchPhaseContext context,
ClusterState clusterState,
BiFunction<InternalSearchResponse, String, SearchPhase> nextPhaseFactory) {
super("fetch");
if (context.getNumShards() != resultConsumer.getNumShards()) {
throw new IllegalStateException("number of shards must match the length of the query results but doesn't:"
@ -74,6 +81,7 @@ final class FetchSearchPhase extends SearchPhase {
this.logger = context.getLogger();
this.resultConsumer = resultConsumer;
this.progressListener = context.getTask().getProgressListener();
this.clusterState = clusterState;
}
@Override
@ -97,8 +105,14 @@ final class FetchSearchPhase extends SearchPhase {
private void innerRun() throws IOException {
final int numShards = context.getNumShards();
final boolean isScrollSearch = context.getRequest().scroll() != null;
List<SearchPhaseResult> phaseResults = queryResults.asList();
String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null;
final List<SearchPhaseResult> phaseResults = queryResults.asList();
final String scrollId;
if (isScrollSearch) {
final boolean includeContextUUID = clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_8_0_0);
scrollId = TransportSearchHelper.buildScrollId(queryResults, includeContextUUID);
} else {
scrollId = null;
}
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();
final boolean queryAndFetchOptimization = queryResults.length() == 1;
final Runnable finishPhase = ()
@ -143,7 +157,7 @@ final class FetchSearchPhase extends SearchPhase {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),
searchShardTarget.getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getContextId(), i, entry,
lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
connection);
@ -153,10 +167,10 @@ final class FetchSearchPhase extends SearchPhase {
}
}
protected ShardFetchSearchRequest createFetchRequest(long queryId, int index, IntArrayList entry,
protected ShardFetchSearchRequest createFetchRequest(SearchContextId contextId, int index, IntArrayList entry,
ScoreDoc[] lastEmittedDocPerShard, OriginalIndices originalIndices) {
final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[index] : null;
return new ShardFetchSearchRequest(originalIndices, queryId, entry, lastEmittedDoc);
return new ShardFetchSearchRequest(originalIndices, contextId, entry, lastEmittedDoc);
}
private void executeFetch(final int shardIndex, final SearchShardTarget shardTarget,
@ -178,7 +192,8 @@ final class FetchSearchPhase extends SearchPhase {
@Override
public void onFailure(Exception e) {
try {
logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e);
logger.debug(
() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.contextId()), e);
progressListener.notifyFetchFailure(shardIndex, e);
counter.onFailure(shardIndex, shardTarget, e);
} finally {
@ -201,7 +216,7 @@ final class FetchSearchPhase extends SearchPhase {
try {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
context.sendReleaseSearchContext(queryResult.getRequestId(), connection, searchShardTarget.getOriginalIndices());
context.sendReleaseSearchContext(queryResult.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception e) {
context.getLogger().trace("failed to release context", e);
}

View File

@ -20,16 +20,17 @@
package org.elasticsearch.action.search;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.internal.SearchContextId;
class ScrollIdForNode {
private final String node;
private final long scrollId;
private final SearchContextId contextId;
private final String clusterAlias;
ScrollIdForNode(@Nullable String clusterAlias, String node, long scrollId) {
ScrollIdForNode(@Nullable String clusterAlias, String node, SearchContextId contextId) {
this.node = node;
this.clusterAlias = clusterAlias;
this.scrollId = scrollId;
this.contextId = contextId;
}
public String getNode() {
@ -41,15 +42,15 @@ class ScrollIdForNode {
return clusterAlias;
}
public long getScrollId() {
return scrollId;
public SearchContextId getContextId() {
return contextId;
}
@Override
public String toString() {
return "ScrollIdForNode{" +
"node='" + node + '\'' +
", scrollId=" + scrollId +
", scrollId=" + contextId +
", clusterAlias='" + clusterAlias + '\'' +
'}';
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.dfs.DfsSearchResult;
@ -37,15 +38,17 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
private final SearchPhaseController searchPhaseController;
SearchDfsQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection, final Map<String, AliasFilter> aliasFilter,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection,
final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts, final Map<String, Set<String>> indexRoutings,
final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
final long clusterStateVersion, final SearchTask task, SearchResponse.Clusters clusters) {
final GroupShardsIterator<SearchShardIterator> shardsIts,
final TransportSearchAction.SearchTimeProvider timeProvider,
final ClusterState clusterState, final SearchTask task, SearchResponse.Clusters clusters) {
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()),
shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
}
@ -60,6 +63,6 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<DfsSearchResult> results, final SearchPhaseContext context) {
return new DfsQueryPhase(results.getAtomicArray(), searchPhaseController, (queryResults) ->
new FetchSearchPhase(queryResults, searchPhaseController, context), context);
new FetchSearchPhase(queryResults, searchPhaseController, context, clusterState()), context);
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.transport.Transport;
@ -96,11 +97,11 @@ interface SearchPhaseContext extends Executor {
/**
* Releases a search context with the given context ID on the node the given connection is connected to.
* @see org.elasticsearch.search.query.QuerySearchResult#getRequestId()
* @see org.elasticsearch.search.fetch.FetchSearchResult#getRequestId()
* @see org.elasticsearch.search.query.QuerySearchResult#getContextId()
* @see org.elasticsearch.search.fetch.FetchSearchResult#getContextId()
*
*/
default void sendReleaseSearchContext(long contextId, Transport.Connection connection, OriginalIndices originalIndices) {
default void sendReleaseSearchContext(SearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) {
if (connection != null) {
getSearchTransport().sendFreeContext(connection, contextId, originalIndices);
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchPhaseResult;
@ -39,14 +40,16 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
private final SearchProgressListener progressListener;
SearchQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection, final Map<String, AliasFilter> aliasFilter,
final BiFunction<String, String, Transport.Connection> nodeIdToConnection,
final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts, final Map<String, Set<String>> indexRoutings,
final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion, SearchTask task, SearchResponse.Clusters clusters) {
final GroupShardsIterator<SearchShardIterator> shardsIts,
final TransportSearchAction.SearchTimeProvider timeProvider,
ClusterState clusterState, SearchTask task, SearchResponse.Clusters clusters) {
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
executor, request, listener, shardsIts, timeProvider, clusterState, task,
searchPhaseController.newSearchPhaseResults(task.getProgressListener(), request, shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
@ -70,6 +73,6 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
@Override
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
return new FetchSearchPhase(results, searchPhaseController, context);
return new FetchSearchPhase(results, searchPhaseController, context, clusterState());
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
@ -147,11 +148,11 @@ abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements R
}
connection = getConnection(target.getClusterAlias(), node);
} catch (Exception ex) {
onShardFailure("query", counter, target.getScrollId(),
onShardFailure("query", counter, target.getContextId(),
ex, null, () -> SearchScrollAsyncAction.this.moveToNextPhase(clusterNodeLookup));
continue;
}
final InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(target.getScrollId(), request);
final InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(target.getContextId(), request);
// we can't create a SearchShardTarget here since we don't know the index and shard ID we are talking to
// we only know the node and the search context ID. Yet, the response will contain the SearchShardTarget
// from the target node instead...that's why we pass null here
@ -191,7 +192,7 @@ abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements R
@Override
public void onFailure(Exception t) {
onShardFailure("query", counter, target.getScrollId(), t, null,
onShardFailure("query", counter, target.getContextId(), t, null,
() -> SearchScrollAsyncAction.this.moveToNextPhase(clusterNodeLookup));
}
};
@ -247,7 +248,7 @@ abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements R
}
}
protected void onShardFailure(String phaseName, final CountDown counter, final long searchId, Exception failure,
protected void onShardFailure(String phaseName, final CountDown counter, final SearchContextId searchId, Exception failure,
@Nullable SearchShardTarget searchShardTarget,
Supplier<SearchPhase> nextPhaseSupplier) {
if (logger.isDebugEnabled()) {

View File

@ -86,7 +86,7 @@ final class SearchScrollQueryThenFetchAsyncAction extends SearchScrollAsyncActio
if (docIds != null) {
final QuerySearchResult querySearchResult = queryResults.get(index);
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.getRequestId(), docIds,
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.getContextId(), docIds,
lastEmittedDoc);
SearchShardTarget searchShardTarget = querySearchResult.getSearchShardTarget();
DiscoveryNode node = clusterNodeLookup.apply(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
@ -104,7 +104,7 @@ final class SearchScrollQueryThenFetchAsyncAction extends SearchScrollAsyncActio
@Override
public void onFailure(Exception t) {
onShardFailure(getName(), counter, querySearchResult.getRequestId(),
onShardFailure(getName(), counter, querySearchResult.getContextId(),
t, querySearchResult.getSearchShardTarget(),
() -> sendResponsePhase(reducedQueryPhase, fetchResults));
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
@ -87,7 +88,7 @@ public class SearchTransportService {
this.responseWrapper = responseWrapper;
}
public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {
public void sendFreeContext(Transport.Connection connection, final SearchContextId contextId, OriginalIndices originalIndices) {
transportService.sendRequest(connection, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(originalIndices, contextId),
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(new ActionListener<SearchFreeContextResponse>() {
@Override
@ -102,7 +103,8 @@ public class SearchTransportService {
}, SearchFreeContextResponse::new));
}
public void sendFreeContext(Transport.Connection connection, long contextId, final ActionListener<SearchFreeContextResponse> listener) {
public void sendFreeContext(Transport.Connection connection, SearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener) {
transportService.sendRequest(connection, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId),
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new));
}
@ -194,28 +196,25 @@ public class SearchTransportService {
}
static class ScrollFreeContextRequest extends TransportRequest {
private long id;
private SearchContextId contextId;
ScrollFreeContextRequest() {
}
ScrollFreeContextRequest(long id) {
this.id = id;
ScrollFreeContextRequest(SearchContextId contextId) {
this.contextId = contextId;
}
ScrollFreeContextRequest(StreamInput in) throws IOException {
super(in);
id = in.readLong();
contextId = new SearchContextId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
contextId.writeTo(out);
}
public long id() {
return this.id;
public SearchContextId id() {
return this.contextId;
}
}
@ -223,10 +222,7 @@ public class SearchTransportService {
static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
private OriginalIndices originalIndices;
SearchFreeContextRequest() {
}
SearchFreeContextRequest(OriginalIndices originalIndices, long id) {
SearchFreeContextRequest(OriginalIndices originalIndices, SearchContextId id) {
super(id);
this.originalIndices = originalIndices;
}

View File

@ -511,7 +511,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
nodes::get, remoteConnections, searchTransportService::getConnection);
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState,
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
}
@ -560,7 +560,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
long clusterStateVersion,
ClusterState clusterState,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
@ -571,14 +571,14 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
if (preFilter) {
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task, (iter) -> {
timeProvider, clusterState, task, (iter) -> {
AbstractSearchAsyncAction<? extends SearchPhaseResult> action = searchAsyncAction(
task,
searchRequest,
iter,
timeProvider,
connectionLookup,
clusterStateVersion,
clusterState,
aliasFilter,
concreteIndexBoosts,
indexRoutings,
@ -598,12 +598,12 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
shardIterators, timeProvider, clusterStateVersion, task, clusters);
shardIterators, timeProvider, clusterState, task, clusters);
break;
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, searchRequest, listener,
shardIterators, timeProvider, clusterStateVersion, task, clusters);
shardIterators, timeProvider, clusterState, task, clusters);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.transport.RemoteClusterAware;
import java.io.IOException;
@ -32,16 +33,25 @@ import java.util.Base64;
final class TransportSearchHelper {
static InternalScrollSearchRequest internalScrollSearchRequest(long id, SearchScrollRequest request) {
private static final String INCLUDE_CONTEXT_UUID = "include_context_uuid";
static InternalScrollSearchRequest internalScrollSearchRequest(SearchContextId id, SearchScrollRequest request) {
return new InternalScrollSearchRequest(request, id);
}
static String buildScrollId(AtomicArray<? extends SearchPhaseResult> searchPhaseResults) throws IOException {
static String buildScrollId(AtomicArray<? extends SearchPhaseResult> searchPhaseResults,
boolean includeContextUUID) throws IOException {
try (RAMOutputStream out = new RAMOutputStream()) {
if (includeContextUUID) {
out.writeString(INCLUDE_CONTEXT_UUID);
}
out.writeString(searchPhaseResults.length() == 1 ? ParsedScrollId.QUERY_AND_FETCH_TYPE : ParsedScrollId.QUERY_THEN_FETCH_TYPE);
out.writeVInt(searchPhaseResults.asList().size());
for (SearchPhaseResult searchPhaseResult : searchPhaseResults.asList()) {
out.writeLong(searchPhaseResult.getRequestId());
if (includeContextUUID) {
out.writeString(searchPhaseResult.getContextId().getReaderId());
}
out.writeLong(searchPhaseResult.getContextId().getId());
SearchShardTarget searchShardTarget = searchPhaseResult.getSearchShardTarget();
if (searchShardTarget.getClusterAlias() != null) {
out.writeString(
@ -60,9 +70,19 @@ final class TransportSearchHelper {
try {
byte[] bytes = Base64.getUrlDecoder().decode(scrollId);
ByteArrayDataInput in = new ByteArrayDataInput(bytes);
String type = in.readString();
final boolean includeContextUUID;
final String type;
final String firstChunk = in.readString();
if (INCLUDE_CONTEXT_UUID.equals(firstChunk)) {
includeContextUUID = true;
type = in.readString();
} else {
includeContextUUID = false;
type = firstChunk;
}
ScrollIdForNode[] context = new ScrollIdForNode[in.readVInt()];
for (int i = 0; i < context.length; ++i) {
final String contextUUID = includeContextUUID ? in.readString() : "";
long id = in.readLong();
String target = in.readString();
String clusterAlias;
@ -73,7 +93,7 @@ final class TransportSearchHelper {
clusterAlias = target.substring(0, index);
target = target.substring(index+1);
}
context[i] = new ScrollIdForNode(clusterAlias, target, id);
context[i] = new ScrollIdForNode(clusterAlias, target, new SearchContextId(contextUUID, id));
}
if (in.getPosition() != bytes.length) {
throw new IllegalArgumentException("Not all bytes were read");

View File

@ -64,6 +64,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.SearchContextHighlight;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
@ -85,7 +86,7 @@ import java.util.function.LongSupplier;
final class DefaultSearchContext extends SearchContext {
private final long id;
private final SearchContextId id;
private final ShardSearchRequest request;
private final SearchShardTarget shardTarget;
private final LongSupplier relativeTimeSupplier;
@ -156,7 +157,7 @@ final class DefaultSearchContext extends SearchContext {
private final QueryShardContext queryShardContext;
private final FetchPhase fetchPhase;
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
DefaultSearchContext(SearchContextId id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
IndexShard indexShard, BigArrays bigArrays, LongSupplier relativeTimeSupplier, TimeValue timeout,
FetchPhase fetchPhase, Version minNodeVersion) throws IOException {
@ -317,7 +318,7 @@ final class DefaultSearchContext extends SearchContext {
}
@Override
public long id() {
public SearchContextId id() {
return this.id;
}

View File

@ -23,20 +23,21 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.internal.SearchContextId;
import java.io.IOException;
public class SearchContextMissingException extends ElasticsearchException {
private final long id;
private final SearchContextId contextId;
public SearchContextMissingException(long id) {
super("No search context found for id [" + id + "]");
this.id = id;
public SearchContextMissingException(SearchContextId contextId) {
super("No search context found for id [" + contextId.getId() + "]");
this.contextId = contextId;
}
public long id() {
return this.id;
public SearchContextId contextId() {
return this.contextId;
}
@Override
@ -46,12 +47,12 @@ public class SearchContextMissingException extends ElasticsearchException {
public SearchContextMissingException(StreamInput in) throws IOException{
super(in);
id = in.readLong();
contextId = new SearchContextId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
contextId.writeTo(out);
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.transport.TransportResponse;
@ -39,7 +40,7 @@ public abstract class SearchPhaseResult extends TransportResponse {
private SearchShardTarget searchShardTarget;
private int shardIndex = -1;
protected long requestId;
protected SearchContextId contextId;
protected SearchPhaseResult() {
@ -50,10 +51,10 @@ public abstract class SearchPhaseResult extends TransportResponse {
}
/**
* Returns the results request ID that is used to reference the search context on the executing node
* Returns the search context ID that is used to reference the search context on the executing node
*/
public long getRequestId() {
return requestId;
public SearchContextId getContextId() {
return contextId;
}
/**

View File

@ -32,6 +32,7 @@ import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
@ -83,6 +84,7 @@ import org.elasticsearch.search.fetch.subphase.FetchDocValuesContext;
import org.elasticsearch.search.fetch.subphase.ScriptFieldsContext.ScriptField;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
@ -284,7 +286,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
protected void putContext(SearchContext context) {
final SearchContext previous = activeContexts.put(context.id(), context);
final SearchContext previous = activeContexts.put(context.id().getId(), context);
assert previous == null;
}
@ -412,8 +414,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
listener.onResponse(QuerySearchResult.nullInstance());
}
private <T> void runAsync(long id, Supplier<T> executable, ActionListener<T> listener) {
getExecutor(id).execute(ActionRunnable.supply(listener, executable::get));
private <T> void runAsync(SearchContextId contextId, Supplier<T> executable, ActionListener<T> listener) {
getExecutor(contextId).execute(ActionRunnable.supply(listener, executable::get));
}
private SearchPhaseResult executeQueryPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws Exception {
@ -468,8 +470,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public void executeQueryPhase(InternalScrollSearchRequest request,
SearchShardTask task,
ActionListener<ScrollQuerySearchResult> listener) {
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
runAsync(request.contextId(), () -> {
final SearchContext context = findContext(request.contextId(), request);
context.incRef();
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
context.setTask(task);
@ -490,8 +492,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener<QuerySearchResult> listener) {
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
runAsync(request.contextId(), () -> {
final SearchContext context = findContext(request.contextId(), request);
context.setTask(task);
context.incRef();
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
@ -526,10 +528,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
final Executor getExecutor(long id) {
SearchContext context = activeContexts.get(id);
final Executor getExecutor(SearchContextId contextId) {
SearchContext context = getContext(contextId);
if (context == null) {
throw new SearchContextMissingException(id);
throw new SearchContextMissingException(contextId);
}
return getExecutor(context.indexShard());
}
@ -541,8 +544,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task,
ActionListener<ScrollQueryFetchSearchResult> listener) {
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
runAsync(request.contextId(), () -> {
final SearchContext context = findContext(request.contextId(), request);
context.setTask(task);
context.incRef();
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){
@ -563,8 +566,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener<FetchSearchResult> listener) {
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
runAsync(request.contextId(), () -> {
final SearchContext context = findContext(request.contextId(), request);
context.incRef();
try {
context.setTask(task);
@ -576,7 +579,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) {
fetchPhase.execute(context);
if (fetchPhaseShouldFreeContext(context)) {
freeContext(request.id());
freeContext(request.contextId());
} else {
contextProcessedSuccessfully(context);
}
@ -593,10 +596,21 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}, listener);
}
private SearchContext findContext(long id, TransportRequest request) throws SearchContextMissingException {
SearchContext context = activeContexts.get(id);
private SearchContext getContext(SearchContextId contextId) {
final SearchContext context = activeContexts.get(contextId.getId());
if (context == null) {
throw new SearchContextMissingException(id);
return null;
}
if (context.id().getReaderId().equals(contextId.getReaderId()) || contextId.getReaderId().isEmpty()) {
return context;
}
return null;
}
private SearchContext findContext(SearchContextId contextId, TransportRequest request) throws SearchContextMissingException {
final SearchContext context = getContext(contextId);
if (context == null) {
throw new SearchContextMissingException(contextId);
}
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
@ -704,7 +718,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
IndexShard indexShard = indexService.getShard(request.shardId().getId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().getId(),
indexShard.shardId(), request.getClusterAlias(), OriginalIndices.NONE);
DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
// TODO: If no changes are made since the last commit, and the searcher is opened from that commit, then we can use the
// commit_id as the context_id. And if the local checkpoint and max_seq_no of that commit equal the global checkpoint,
// then we can use a combination of history_uuid and one of these values as a **weaker** context_id.
// Reader contexts with the same commit_id can be replaced at any time, as the Lucene doc ids are the same.
// Reader contexts with the same seq_id, however, can't be replaced between the query and fetch phase because
// the Lucene doc ids can be different.
final String readerId = UUIDs.base64UUID();
final SearchContextId searchContextId = new SearchContextId(readerId, idGenerator.incrementAndGet());
DefaultSearchContext searchContext = new DefaultSearchContext(searchContextId, request, shardTarget,
searcher, clusterService, indexService, indexShard, bigArrays, threadPool::relativeTimeInMillis, timeout,
fetchPhase, clusterService.state().nodes().getMinNodeVersion());
success = true;
@ -719,6 +741,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
private void freeAllContextForIndex(Index index) {
assert index != null;
for (SearchContext ctx : activeContexts.values()) {
@ -728,20 +751,21 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
}
public boolean freeContext(long id) {
try (SearchContext context = removeContext(id)) {
public boolean freeContext(SearchContextId contextId) {
if (getContext(contextId) != null) {
try (SearchContext context = removeContext(contextId.getId())) {
if (context != null) {
onFreeContext(context);
return true;
}
return false;
}
}
return false;
}
private void onFreeContext(SearchContext context) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
assert activeContexts.containsKey(context.id()) == false;
assert activeContexts.containsKey(context.id().getId()) == false;
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.SearchContextId;
import java.io.IOException;
@ -45,7 +46,7 @@ public class DfsSearchResult extends SearchPhaseResult {
public DfsSearchResult(StreamInput in) throws IOException {
super(in);
requestId = in.readLong();
contextId = new SearchContextId(in);
int termsSize = in.readVInt();
if (termsSize == 0) {
terms = EMPTY_TERMS;
@ -61,9 +62,9 @@ public class DfsSearchResult extends SearchPhaseResult {
maxDoc = in.readVInt();
}
public DfsSearchResult(long id, SearchShardTarget shardTarget) {
public DfsSearchResult(SearchContextId contextId, SearchShardTarget shardTarget) {
this.setSearchShardTarget(shardTarget);
this.requestId = id;
this.contextId = contextId;
}
public DfsSearchResult maxDoc(int maxDoc) {
@ -100,7 +101,7 @@ public class DfsSearchResult extends SearchPhaseResult {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(requestId);
contextId.writeTo(out);
out.writeVInt(terms.length);
for (Term term : terms) {
out.writeString(term.field());

View File

@ -25,6 +25,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.query.QuerySearchResult;
import java.io.IOException;
@ -40,12 +41,12 @@ public final class FetchSearchResult extends SearchPhaseResult {
public FetchSearchResult(StreamInput in) throws IOException {
super(in);
requestId = in.readLong();
contextId = new SearchContextId(in);
hits = new SearchHits(in);
}
public FetchSearchResult(long id, SearchShardTarget shardTarget) {
this.requestId = id;
public FetchSearchResult(SearchContextId id, SearchShardTarget shardTarget) {
this.contextId = id;
setSearchShardTarget(shardTarget);
}
@ -86,7 +87,7 @@ public final class FetchSearchResult extends SearchPhaseResult {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(requestId);
contextId.writeTo(out);
hits.writeTo(out);
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.query.QuerySearchResult;
import java.io.IOException;
@ -44,8 +45,8 @@ public final class QueryFetchSearchResult extends SearchPhaseResult {
}
@Override
public long getRequestId() {
return queryResult.getRequestId();
public SearchContextId getContextId() {
return queryResult.getContextId();
}
@Override

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
@ -39,7 +40,7 @@ import java.util.Map;
*/
public class ShardFetchRequest extends TransportRequest {
private long id;
private SearchContextId contextId;
private int[] docIds;
@ -50,8 +51,8 @@ public class ShardFetchRequest extends TransportRequest {
public ShardFetchRequest() {
}
public ShardFetchRequest(long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
this.id = id;
public ShardFetchRequest(SearchContextId contextId, IntArrayList list, ScoreDoc lastEmittedDoc) {
this.contextId = contextId;
this.docIds = list.buffer;
this.size = list.size();
this.lastEmittedDoc = lastEmittedDoc;
@ -59,7 +60,7 @@ public class ShardFetchRequest extends TransportRequest {
public ShardFetchRequest(StreamInput in) throws IOException {
super(in);
id = in.readLong();
contextId = new SearchContextId(in);
size = in.readVInt();
docIds = new int[size];
for (int i = 0; i < size; i++) {
@ -78,7 +79,7 @@ public class ShardFetchRequest extends TransportRequest {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
contextId.writeTo(out);
out.writeVInt(size);
for (int i = 0; i < size; i++) {
out.writeVInt(docIds[i]);
@ -94,8 +95,8 @@ public class ShardFetchRequest extends TransportRequest {
}
}
public long id() {
return id;
public SearchContextId contextId() {
return contextId;
}
public int[] docIds() {
@ -117,7 +118,7 @@ public class ShardFetchRequest extends TransportRequest {
@Override
public String getDescription() {
return "id[" + id + "], size[" + size + "], lastEmittedDoc[" + lastEmittedDoc + "]";
return "id[" + contextId + "], size[" + size + "], lastEmittedDoc[" + lastEmittedDoc + "]";
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.internal.SearchContextId;
import java.io.IOException;
@ -41,7 +42,7 @@ public class ShardFetchSearchRequest extends ShardFetchRequest implements Indice
}
public ShardFetchSearchRequest(OriginalIndices originalIndices, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
public ShardFetchSearchRequest(OriginalIndices originalIndices, SearchContextId id, IntArrayList list, ScoreDoc lastEmittedDoc) {
super(id, list, lastEmittedDoc);
this.originalIndices = originalIndices;
}

View File

@ -106,7 +106,7 @@ public abstract class FilteredSearchContext extends SearchContext {
}
@Override
public long id() {
public SearchContextId id() {
return in.id();
}

View File

@ -33,33 +33,33 @@ import java.util.Map;
public class InternalScrollSearchRequest extends TransportRequest {
private long id;
private SearchContextId contextId;
private Scroll scroll;
public InternalScrollSearchRequest() {
}
public InternalScrollSearchRequest(SearchScrollRequest request, long id) {
this.id = id;
public InternalScrollSearchRequest(SearchScrollRequest request, SearchContextId contextId) {
this.contextId = contextId;
this.scroll = request.scroll();
}
public InternalScrollSearchRequest(StreamInput in) throws IOException {
super(in);
id = in.readLong();
contextId = new SearchContextId(in);
scroll = in.readOptionalWriteable(Scroll::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
contextId.writeTo(out);
out.writeOptionalWriteable(scroll);
}
public long id() {
return id;
public SearchContextId contextId() {
return contextId;
}
public Scroll scroll() {
@ -78,7 +78,7 @@ public class InternalScrollSearchRequest extends TransportRequest {
@Override
public String getDescription() {
return "id[" + id + "], scroll[" + scroll + "]";
return "id[" + contextId.getId() + "], scroll[" + scroll + "]";
}
}

View File

@ -132,7 +132,7 @@ public abstract class SearchContext extends AbstractRefCounted implements Releas
* alias filters, types filters, etc. */
public abstract Query buildFilteredQuery(Query query);
public abstract long id();
public abstract SearchContextId id();
public abstract String source();

View File

@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.internal;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.Objects;
public final class SearchContextId implements Writeable {
private final String readerId;
private final long id;
public SearchContextId(String readerId, long id) {
this.readerId = Objects.requireNonNull(readerId);
this.id = id;
}
public SearchContextId(StreamInput in) throws IOException {
this.id = in.readLong();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.readerId = in.readString();
} else {
this.readerId = "";
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(id);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeString(readerId);
}
}
public String getReaderId() {
return readerId;
}
public long getId() {
return id;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchContextId other = (SearchContextId) o;
return id == other.id && readerId.equals(other.readerId);
}
@Override
public int hashCode() {
return Objects.hash(readerId, id);
}
@Override
public String toString() {
return "[" + readerId + "][" + id + "]";
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;
@ -36,24 +37,21 @@ import java.util.Map;
public class QuerySearchRequest extends TransportRequest implements IndicesRequest {
private long id;
private final SearchContextId contextId;
private AggregatedDfs dfs;
private final AggregatedDfs dfs;
private OriginalIndices originalIndices;
private final OriginalIndices originalIndices;
public QuerySearchRequest() {
}
public QuerySearchRequest(OriginalIndices originalIndices, long id, AggregatedDfs dfs) {
this.id = id;
public QuerySearchRequest(OriginalIndices originalIndices, SearchContextId contextId, AggregatedDfs dfs) {
this.contextId = contextId;
this.dfs = dfs;
this.originalIndices = originalIndices;
}
public QuerySearchRequest(StreamInput in) throws IOException {
super(in);
id = in.readLong();
contextId = new SearchContextId(in);
dfs = new AggregatedDfs(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@ -61,13 +59,13 @@ public class QuerySearchRequest extends TransportRequest implements IndicesReque
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
contextId.writeTo(out);
dfs.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
public long id() {
return id;
public SearchContextId contextId() {
return contextId;
}
public AggregatedDfs dfs() {
@ -92,7 +90,7 @@ public class QuerySearchRequest extends TransportRequest implements IndicesReque
public String getDescription() {
StringBuilder sb = new StringBuilder();
sb.append("id[");
sb.append(id);
sb.append(contextId);
sb.append("], ");
sb.append("indices[");
Strings.arrayToDelimitedString(originalIndices.indices(), ",", sb);

View File

@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
@ -77,13 +78,13 @@ public final class QuerySearchResult extends SearchPhaseResult {
isNull = false;
}
if (isNull == false) {
long id = in.readLong();
SearchContextId id = new SearchContextId(in);
readFromWithId(id, in);
}
}
public QuerySearchResult(long id, SearchShardTarget shardTarget) {
this.requestId = id;
public QuerySearchResult(SearchContextId id, SearchShardTarget shardTarget) {
this.contextId = id;
setSearchShardTarget(shardTarget);
isNull = false;
}
@ -298,8 +299,8 @@ public final class QuerySearchResult extends SearchPhaseResult {
return hasScoreDocs || hasSuggestHits();
}
public void readFromWithId(long id, StreamInput in) throws IOException {
this.requestId = id;
public void readFromWithId(SearchContextId id, StreamInput in) throws IOException {
this.contextId = id;
from = in.readVInt();
size = in.readVInt();
int numSortFieldsPlus1 = in.readVInt();
@ -349,7 +350,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
out.writeBoolean(isNull);
}
if (isNull == false) {
out.writeLong(requestId);
contextId.writeTo(out);
writeToNoId(out);
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.coordination.NoMasterBlockService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -56,6 +57,7 @@ import org.elasticsearch.script.ScriptException;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteTransportException;
@ -811,7 +813,8 @@ public class ElasticsearchExceptionTests extends ESTestCase {
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 61), null,
OriginalIndices.NONE)), new ShardSearchFailure(new RepositoryException("repository_g", "Repo"),
new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 62), null,
OriginalIndices.NONE)), new ShardSearchFailure(new SearchContextMissingException(0L), null)
OriginalIndices.NONE)), new ShardSearchFailure(
new SearchContextMissingException(new SearchContextId(UUIDs.randomBase64UUID(), 0L)), null)
};
failure = new SearchPhaseExecutionException("phase_g", "G", failureCause, shardFailures);

View File

@ -79,6 +79,7 @@ import org.elasticsearch.search.SearchException;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
@ -120,6 +121,7 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.elasticsearch.test.TestSearchContext.SHARD_TARGET;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class ExceptionSerializationTests extends ESTestCase {
@ -351,9 +353,15 @@ public class ExceptionSerializationTests extends ESTestCase {
}
public void testSearchContextMissingException() throws IOException {
long id = randomLong();
SearchContextMissingException ex = serialize(new SearchContextMissingException(id));
assertEquals(id, ex.id());
SearchContextId contextId = new SearchContextId(UUIDs.randomBase64UUID(), randomLong());
Version version = VersionUtils.randomVersion(random());
SearchContextMissingException ex = serialize(new SearchContextMissingException(contextId), version);
assertThat(ex.contextId().getId(), equalTo(contextId.getId()));
if (version.onOrAfter(Version.V_8_0_0)) {
assertThat(ex.contextId().getReaderId(), equalTo(contextId.getReaderId()));
} else {
assertThat(ex.contextId().getReaderId(), equalTo(""));
}
}
public void testCircuitBreakingException() throws IOException {

View File

@ -22,8 +22,10 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
@ -33,6 +35,7 @@ import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
@ -55,7 +58,7 @@ import static org.hamcrest.Matchers.instanceOf;
public class AbstractSearchAsyncActionTests extends ESTestCase {
private final List<Tuple<String, String>> resolvedNodes = new ArrayList<>();
private final Set<Long> releasedContexts = new CopyOnWriteArraySet<>();
private final Set<SearchContextId> releasedContexts = new CopyOnWriteArraySet<>();
private AbstractSearchAsyncAction<SearchPhaseResult> createAction(SearchRequest request,
ArraySearchPhaseResults<SearchPhaseResult> results,
@ -90,7 +93,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
Collections.singletonList(
new SearchShardIterator(null, null, Collections.emptyList(), null)
)
), timeProvider, 0, null,
), timeProvider, ClusterState.EMPTY_STATE, null,
results, request.getMaxConcurrentShardRequests(),
SearchResponse.Clusters.EMPTY) {
@Override
@ -110,7 +113,8 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
}
@Override
public void sendReleaseSearchContext(long contextId, Transport.Connection connection, OriginalIndices originalIndices) {
public void sendReleaseSearchContext(SearchContextId contextId, Transport.Connection connection,
OriginalIndices originalIndices) {
releasedContexts.add(contextId);
}
};
@ -191,7 +195,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false);
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<SearchResponse> listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set);
Set<Long> requestIds = new HashSet<>();
Set<SearchContextId> requestIds = new HashSet<>();
List<Tuple<String, String>> nodeLookups = new ArrayList<>();
int numFailures = randomIntBetween(1, 5);
ArraySearchPhaseResults<SearchPhaseResult> phaseResults = phaseResults(requestIds, nodeLookups, numFailures);
@ -219,7 +223,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false);
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<SearchResponse> listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set);
Set<Long> requestIds = new HashSet<>();
Set<SearchContextId> requestIds = new HashSet<>();
List<Tuple<String, String>> nodeLookups = new ArrayList<>();
ArraySearchPhaseResults<SearchPhaseResult> phaseResults = phaseResults(requestIds, nodeLookups, 0);
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong());
@ -262,16 +266,16 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
assertEquals(0, searchPhaseExecutionException.getSuppressed().length);
}
private static ArraySearchPhaseResults<SearchPhaseResult> phaseResults(Set<Long> requestIds,
private static ArraySearchPhaseResults<SearchPhaseResult> phaseResults(Set<SearchContextId> contextIds,
List<Tuple<String, String>> nodeLookups,
int numFailures) {
int numResults = randomIntBetween(1, 10);
ArraySearchPhaseResults<SearchPhaseResult> phaseResults = new ArraySearchPhaseResults<>(numResults + numFailures);
for (int i = 0; i < numResults; i++) {
long requestId = randomLong();
requestIds.add(requestId);
SearchPhaseResult phaseResult = new PhaseResult(requestId);
SearchContextId contextId = new SearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong());
contextIds.add(contextId);
SearchPhaseResult phaseResult = new PhaseResult(contextId);
String resultClusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10);
String resultNodeId = randomAlphaOfLengthBetween(5, 10);
ShardId resultShardId = new ShardId("index", "index-uuid", i);
@ -284,8 +288,8 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
}
private static final class PhaseResult extends SearchPhaseResult {
PhaseResult(long requestId) {
this.requestId = requestId;
PhaseResult(SearchContextId contextId) {
this.contextId = contextId;
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -90,7 +91,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
searchRequest, null, shardsIter, timeProvider, 0, null,
searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null,
(iter) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
@ -158,7 +159,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
searchRequest, null, shardsIter, timeProvider, 0, null,
searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null,
(iter) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
@ -225,7 +226,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
null,
shardsIter,
timeProvider,
0,
ClusterState.EMPTY_STATE,
null,
(iter) -> new AbstractSearchAsyncAction<SearchPhaseResult>(
"test",
@ -243,7 +244,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
responseListener,
iter,
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
ClusterState.EMPTY_STATE,
null,
new ArraySearchPhaseResults<>(iter.size()),
randomIntBetween(1, 32),
@ -325,7 +326,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
(clusterAlias, node) -> lookup.get(node),
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
searchRequest, null, shardsIter, timeProvider, 0, null,
searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null,
(iter) -> new SearchPhase("test") {
@Override
public void run() {

View File

@ -23,10 +23,12 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.Transport;
@ -92,17 +94,20 @@ public class ClearScrollControllerTests extends ESTestCase {
DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT);
AtomicArray<SearchPhaseResult> array = new AtomicArray<>(3);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(1, node1);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 =
new SearchAsyncActionTests.TestSearchPhaseResult(new SearchContextId(UUIDs.randomBase64UUID(), 1), node1);
testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(12, node2);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 =
new SearchAsyncActionTests.TestSearchPhaseResult(new SearchContextId(UUIDs.randomBase64UUID(), 12), node2);
testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(42, node3);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 =
new SearchAsyncActionTests.TestSearchPhaseResult(new SearchContextId(UUIDs.randomBase64UUID(), 42), node3);
testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null));
array.setOnce(0, testSearchPhaseResult1);
array.setOnce(1, testSearchPhaseResult2);
array.setOnce(2, testSearchPhaseResult3);
AtomicInteger numFreed = new AtomicInteger(0);
String scrollId = TransportSearchHelper.buildScrollId(array);
String scrollId = TransportSearchHelper.buildScrollId(array, randomBoolean());
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
CountDownLatch latch = new CountDownLatch(1);
ActionListener<ClearScrollResponse> listener = new LatchedActionListener<>(new ActionListener<ClearScrollResponse>() {
@ -121,7 +126,7 @@ public class ClearScrollControllerTests extends ESTestCase {
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
@Override
public void sendFreeContext(Transport.Connection connection, long contextId,
public void sendFreeContext(Transport.Connection connection, SearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener) {
nodesInvoked.add(connection.getNode());
boolean freed = randomBoolean();
@ -153,11 +158,14 @@ public class ClearScrollControllerTests extends ESTestCase {
DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT);
AtomicArray<SearchPhaseResult> array = new AtomicArray<>(3);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(1, node1);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 =
new SearchAsyncActionTests.TestSearchPhaseResult(new SearchContextId(UUIDs.randomBase64UUID(), 1), node1);
testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(12, node2);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 =
new SearchAsyncActionTests.TestSearchPhaseResult(new SearchContextId(UUIDs.randomBase64UUID(), 12), node2);
testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(42, node3);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 =
new SearchAsyncActionTests.TestSearchPhaseResult(new SearchContextId(UUIDs.randomBase64UUID(), 42), node3);
testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null));
array.setOnce(0, testSearchPhaseResult1);
array.setOnce(1, testSearchPhaseResult2);
@ -165,7 +173,7 @@ public class ClearScrollControllerTests extends ESTestCase {
AtomicInteger numFreed = new AtomicInteger(0);
AtomicInteger numFailures = new AtomicInteger(0);
AtomicInteger numConnectionFailures = new AtomicInteger(0);
String scrollId = TransportSearchHelper.buildScrollId(array);
String scrollId = TransportSearchHelper.buildScrollId(array, randomBoolean());
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build();
CountDownLatch latch = new CountDownLatch(1);
@ -189,7 +197,7 @@ public class ClearScrollControllerTests extends ESTestCase {
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
@Override
public void sendFreeContext(Transport.Connection connection, long contextId,
public void sendFreeContext(Transport.Connection connection, SearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener) {
nodesInvoked.add(connection.getNode());
boolean freed = randomBoolean();

View File

@ -19,11 +19,13 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
@ -59,7 +61,7 @@ public class CountedCollectorTests extends ESTestCase {
case 1:
state.add(1);
executor.execute(() -> {
DfsSearchResult dfsSearchResult = new DfsSearchResult(shardID, null);
DfsSearchResult dfsSearchResult = new DfsSearchResult(new SearchContextId(UUIDs.randomBase64UUID(), shardID), null);
dfsSearchResult.setShardIndex(shardID);
dfsSearchResult.setSearchShardTarget(new SearchShardTarget("foo",
new ShardId("bar", "baz", shardID), null, OriginalIndices.NONE));
@ -84,7 +86,7 @@ public class CountedCollectorTests extends ESTestCase {
break;
case 1:
assertNotNull(results.get(i));
assertEquals(i, results.get(i).getRequestId());
assertEquals(i, results.get(i).getContextId().getId());
break;
case 2:
final int shardId = i;

View File

@ -25,6 +25,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AtomicArray;
@ -34,6 +35,7 @@ import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.test.ESTestCase;
@ -45,8 +47,8 @@ import java.util.concurrent.atomic.AtomicReference;
public class DfsQueryPhaseTests extends ESTestCase {
private static DfsSearchResult newSearchResult(int shardIndex, long requestId, SearchShardTarget target) {
DfsSearchResult result = new DfsSearchResult(requestId, target);
private static DfsSearchResult newSearchResult(int shardIndex, SearchContextId contextId, SearchShardTarget target) {
DfsSearchResult result = new DfsSearchResult(contextId, target);
result.setShardIndex(shardIndex);
return result;
}
@ -54,8 +56,10 @@ public class DfsQueryPhaseTests extends ESTestCase {
public void testDfsWith2Shards() throws IOException {
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
AtomicReference<AtomicArray<SearchPhaseResult>> responseRef = new AtomicReference<>();
results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.set(0, newSearchResult(0, new SearchContextId(UUIDs.randomBase64UUID(), 1),
new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.set(1, newSearchResult(1, new SearchContextId(UUIDs.randomBase64UUID(), 2),
new SearchShardTarget("node2", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
@ -65,24 +69,24 @@ public class DfsQueryPhaseTests extends ESTestCase {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
SearchActionListener<QuerySearchResult> listener) {
if (request.id() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
if (request.contextId().getId() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), 123),
new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(2); // the size of the result set
listener.onResponse(queryResult);
} else if (request.id() == 2) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node2", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
} else if (request.contextId().getId() == 2) {
QuerySearchResult queryResult = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), 123),
new SearchShardTarget("node2", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F),
new DocValueFormat[0]);
queryResult.size(2); // the size of the result set
listener.onResponse(queryResult);
} else {
fail("no such request ID: " + request.id());
fail("no such request ID: " + request.contextId());
}
}
};
@ -114,8 +118,10 @@ public class DfsQueryPhaseTests extends ESTestCase {
public void testDfsWith1ShardFailed() throws IOException {
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
AtomicReference<AtomicArray<SearchPhaseResult>> responseRef = new AtomicReference<>();
results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
final SearchContextId ctx1 = new SearchContextId(UUIDs.randomBase64UUID(), 1);
final SearchContextId ctx2 = new SearchContextId(UUIDs.randomBase64UUID(), 2);
results.set(0, newSearchResult(0, ctx1, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.set(1, newSearchResult(1, ctx2, new SearchShardTarget("node2", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
@ -125,18 +131,18 @@ public class DfsQueryPhaseTests extends ESTestCase {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
SearchActionListener<QuerySearchResult> listener) {
if (request.id() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
if (request.contextId().getId() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), 123),
new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(
new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(2); // the size of the result set
listener.onResponse(queryResult);
} else if (request.id() == 2) {
} else if (request.contextId().getId() == 2) {
listener.onFailure(new MockDirectoryWrapper.FakeIOException());
} else {
fail("no such request ID: " + request.id());
fail("no such request ID: " + request.contextId());
}
}
};
@ -163,7 +169,7 @@ public class DfsQueryPhaseTests extends ESTestCase {
assertEquals(1, mockSearchPhaseContext.failures.size());
assertTrue(mockSearchPhaseContext.failures.get(0).getCause() instanceof MockDirectoryWrapper.FakeIOException);
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(2L));
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(ctx2));
assertNull(responseRef.get().get(1));
}
@ -171,8 +177,10 @@ public class DfsQueryPhaseTests extends ESTestCase {
public void testFailPhaseOnException() throws IOException {
AtomicArray<DfsSearchResult> results = new AtomicArray<>(2);
AtomicReference<AtomicArray<SearchPhaseResult>> responseRef = new AtomicReference<>();
results.set(0, newSearchResult(0, 1, new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.set(1, newSearchResult(1, 2, new SearchShardTarget("node2", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.set(0, newSearchResult(0, new SearchContextId(UUIDs.randomBase64UUID(), 1),
new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.set(1, newSearchResult(1, new SearchContextId(UUIDs.randomBase64UUID(), 2),
new SearchShardTarget("node2", new ShardId("test", "na", 0), null, OriginalIndices.NONE)));
results.get(0).termsStatistics(new Term[0], new TermStatistics[0]);
results.get(1).termsStatistics(new Term[0], new TermStatistics[0]);
@ -182,18 +190,18 @@ public class DfsQueryPhaseTests extends ESTestCase {
@Override
public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest request, SearchTask task,
SearchActionListener<QuerySearchResult> listener) {
if (request.id() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
if (request.contextId().getId() == 1) {
QuerySearchResult queryResult = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), 123),
new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(2); // the size of the result set
listener.onResponse(queryResult);
} else if (request.id() == 2) {
} else if (request.contextId().getId() == 2) {
throw new UncheckedIOException(new MockDirectoryWrapper.FakeIOException());
} else {
fail("no such request ID: " + request.id());
fail("no such request ID: " + request.contextId());
}
}
};

View File

@ -23,6 +23,8 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.shard.ShardId;
@ -35,6 +37,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
@ -70,7 +73,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
numHits = 0;
}
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() {
@ -95,15 +98,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
ArraySearchPhaseResults<SearchPhaseResult> results = controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2);
int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
final SearchContextId ctx1 = new SearchContextId(UUIDs.randomBase64UUID(), 123);
QuerySearchResult queryResult = new QuerySearchResult(ctx1,
new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set
queryResult.setShardIndex(0);
results.consumeResult(queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
final SearchContextId ctx2 = new SearchContextId(UUIDs.randomBase64UUID(), 312);
queryResult = new QuerySearchResult(ctx2,
new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize);
@ -115,18 +121,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
FetchSearchResult fetchResult = new FetchSearchResult();
if (request.id() == 321) {
if (request.contextId().equals(ctx2)) {
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)},
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 2.0F));
} else {
assertEquals(123, request.id());
assertEquals(ctx1, request.contextId());
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)},
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F));
}
listener.onResponse(fetchResult);
}
};
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() {
@ -153,15 +159,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2);
int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
SearchContextId ctx1 = new SearchContextId(UUIDs.randomBase64UUID(), 123);
QuerySearchResult queryResult = new QuerySearchResult(ctx1,
new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set
queryResult.setShardIndex(0);
results.consumeResult(queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
SearchContextId ctx2 = new SearchContextId(UUIDs.randomBase64UUID(), 321);
queryResult = new QuerySearchResult(ctx2,
new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize);
@ -172,7 +181,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
@Override
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
if (request.id() == 321) {
if (request.contextId().getId() == 321) {
FetchSearchResult fetchResult = new FetchSearchResult();
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)},
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 2.0F));
@ -183,7 +192,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
}
};
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() {
@ -202,7 +211,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
assertEquals(1, searchResponse.getShardFailures().length);
assertTrue(searchResponse.getShardFailures()[0].getCause() instanceof MockDirectoryWrapper.FakeIOException);
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(123L));
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(ctx1));
}
public void testFetchDocsConcurrently() throws InterruptedException {
@ -215,8 +224,8 @@ public class FetchSearchPhaseTests extends ESTestCase {
ArraySearchPhaseResults<SearchPhaseResult> results = controller.newSearchPhaseResults(NOOP,
mockSearchPhaseContext.getRequest(), numHits);
for (int i = 0; i < numHits; i++) {
QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
QuerySearchResult queryResult = new QuerySearchResult(new SearchContextId("", i),
new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(i+1, i)}), i), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set
@ -229,14 +238,14 @@ public class FetchSearchPhaseTests extends ESTestCase {
SearchActionListener<FetchSearchResult> listener) {
new Thread(() -> {
FetchSearchResult fetchResult = new FetchSearchResult();
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit((int) (request.id()+1))},
fetchResult.hits(new SearchHits(new SearchHit[]{new SearchHit((int) (request.contextId().getId() + 1))},
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 100F));
listener.onResponse(fetchResult);
}).start();
}
};
CountDownLatch latch = new CountDownLatch(1);
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() {
@ -272,15 +281,16 @@ public class FetchSearchPhaseTests extends ESTestCase {
ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2);
int resultSetSize = randomIntBetween(2, 10);
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
QuerySearchResult queryResult = new QuerySearchResult(new SearchContextId("", 123),
new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set
queryResult.setShardIndex(0);
results.consumeResult(queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
queryResult = new QuerySearchResult(new SearchContextId("", 321),
new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize);
@ -295,18 +305,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
if (numFetches.incrementAndGet() == 1) {
throw new RuntimeException("BOOM");
}
if (request.id() == 321) {
if (request.contextId().getId() == 321) {
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)},
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 2.0F));
} else {
assertEquals(request, 123);
assertEquals(request.contextId().getId(), 123);
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)},
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F));
}
listener.onResponse(fetchResult);
}
};
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() {
@ -328,15 +338,18 @@ public class FetchSearchPhaseTests extends ESTestCase {
ArraySearchPhaseResults<SearchPhaseResult> results =
controller.newSearchPhaseResults(NOOP, mockSearchPhaseContext.getRequest(), 2);
int resultSetSize = 1;
QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new ShardId("test", "na", 0),
null, OriginalIndices.NONE));
SearchContextId ctx1 = new SearchContextId(UUIDs.randomBase64UUID(), 123);
QuerySearchResult queryResult = new QuerySearchResult(ctx1,
new SearchShardTarget("node1", new ShardId("test", "na", 0), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(42, 1.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize); // the size of the result set
queryResult.setShardIndex(0);
results.consumeResult(queryResult);
queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
SearchContextId ctx2 = new SearchContextId(UUIDs.randomBase64UUID(), 321);
queryResult = new QuerySearchResult(ctx2,
new SearchShardTarget("node2", new ShardId("test", "na", 1), null, OriginalIndices.NONE));
queryResult.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(84, 2.0F)}), 2.0F), new DocValueFormat[0]);
queryResult.size(resultSetSize);
@ -348,7 +361,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRequest request, SearchTask task,
SearchActionListener<FetchSearchResult> listener) {
FetchSearchResult fetchResult = new FetchSearchResult();
if (request.id() == 321) {
if (request.contextId().equals(ctx2)) {
fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(84)},
new TotalHits(1, TotalHits.Relation.EQUAL_TO), 2.0F));
} else {
@ -357,7 +370,7 @@ public class FetchSearchPhaseTests extends ESTestCase {
listener.onResponse(fetchResult);
}
};
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, ClusterState.EMPTY_STATE,
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() {
@ -375,6 +388,6 @@ public class FetchSearchPhaseTests extends ESTestCase {
assertEquals(0, searchResponse.getFailedShards());
assertEquals(2, searchResponse.getSuccessfulShards());
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(123L));
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(ctx1));
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.transport.Transport;
import org.junit.Assert;
@ -46,7 +47,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
final AtomicInteger numSuccess;
final List<ShardSearchFailure> failures = Collections.synchronizedList(new ArrayList<>());
SearchTransportService searchTransport;
final Set<Long> releasedSearchContexts = new HashSet<>();
final Set<SearchContextId> releasedSearchContexts = new HashSet<>();
final SearchRequest searchRequest = new SearchRequest();
final AtomicReference<SearchResponse> searchResponse = new AtomicReference<>();
@ -135,7 +136,7 @@ public final class MockSearchPhaseContext implements SearchPhaseContext {
}
@Override
public void sendReleaseSearchContext(long contextId, Transport.Connection connection, OriginalIndices originalIndices) {
public void sendReleaseSearchContext(SearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) {
releasedSearchContexts.add(contextId);
}
}

View File

@ -21,18 +21,21 @@ package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
@ -106,7 +109,7 @@ public class SearchAsyncActionTests extends ESTestCase {
responseListener,
shardsIter,
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
ClusterState.EMPTY_STATE,
null,
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
@ -122,7 +125,8 @@ public class SearchAsyncActionTests extends ESTestCase {
new Thread(() -> {
Transport.Connection connection = getConnection(null, shard.currentNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(
new SearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()),
connection.getNode());
listener.onResponse(testSearchPhaseResult);
@ -211,7 +215,7 @@ public class SearchAsyncActionTests extends ESTestCase {
responseListener,
shardsIter,
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
ClusterState.EMPTY_STATE,
null,
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
@ -232,8 +236,8 @@ public class SearchAsyncActionTests extends ESTestCase {
throw new AssertionError(e);
}
Transport.Connection connection = getConnection(null, shard.currentNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
connection.getNode());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(
new SearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode());
if (shardFailures[shard.shardId().id()]) {
listener.onFailure(new RuntimeException());
} else {
@ -277,7 +281,7 @@ public class SearchAsyncActionTests extends ESTestCase {
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
Map<DiscoveryNode, Set<Long>> nodeToContextMap = newConcurrentMap();
Map<DiscoveryNode, Set<SearchContextId>> nodeToContextMap = newConcurrentMap();
AtomicInteger contextIdGenerator = new AtomicInteger(0);
int numShards = randomIntBetween(1, 10);
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
@ -286,7 +290,7 @@ public class SearchAsyncActionTests extends ESTestCase {
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(null, null) {
@Override
public void sendFreeContext(Transport.Connection connection, long contextId, OriginalIndices originalIndices) {
public void sendFreeContext(Transport.Connection connection, SearchContextId contextId, OriginalIndices originalIndices) {
numFreedContext.incrementAndGet();
assertTrue(nodeToContextMap.containsKey(connection.getNode()));
assertTrue(nodeToContextMap.get(connection.getNode()).remove(contextId));
@ -314,7 +318,7 @@ public class SearchAsyncActionTests extends ESTestCase {
responseListener,
shardsIter,
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
ClusterState.EMPTY_STATE,
null,
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
@ -326,10 +330,10 @@ public class SearchAsyncActionTests extends ESTestCase {
listener) {
assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId()));
Transport.Connection connection = getConnection(null, shard.currentNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
connection.getNode());
Set<Long> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> newConcurrentSet());
ids.add(testSearchPhaseResult.getRequestId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(
new SearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode());
Set<SearchContextId> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> newConcurrentSet());
ids.add(testSearchPhaseResult.getContextId());
if (randomBoolean()) {
listener.onResponse(testSearchPhaseResult);
} else {
@ -345,7 +349,7 @@ public class SearchAsyncActionTests extends ESTestCase {
for (int i = 0; i < results.getNumShards(); i++) {
TestSearchPhaseResult result = results.getAtomicArray().get(i);
assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId());
sendReleaseSearchContext(result.getRequestId(), new MockConnection(result.node), OriginalIndices.NONE);
sendReleaseSearchContext(result.getContextId(), new MockConnection(result.node), OriginalIndices.NONE);
}
responseListener.onResponse(response);
}
@ -419,7 +423,7 @@ public class SearchAsyncActionTests extends ESTestCase {
responseListener,
shardsIter,
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
ClusterState.EMPTY_STATE,
null,
new ArraySearchPhaseResults<>(shardsIter.size()),
request.getMaxConcurrentShardRequests(),
@ -434,8 +438,8 @@ public class SearchAsyncActionTests extends ESTestCase {
});
new Thread(() -> {
Transport.Connection connection = getConnection(null, shard.currentNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
connection.getNode());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(
new SearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode());
if (shardIt.remaining() > 0) {
numFailReplicas.incrementAndGet();
listener.onFailure(new RuntimeException());
@ -513,9 +517,8 @@ public class SearchAsyncActionTests extends ESTestCase {
public static class TestSearchPhaseResult extends SearchPhaseResult {
final DiscoveryNode node;
TestSearchPhaseResult(long id, DiscoveryNode node) {
this.requestId = id;
TestSearchPhaseResult(SearchContextId contextId, DiscoveryNode node) {
this.contextId = contextId;
this.node = node;
}

View File

@ -31,6 +31,7 @@ import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.text.Text;
@ -50,6 +51,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.suggest.SortBy;
import org.elasticsearch.search.suggest.Suggest;
@ -230,7 +232,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
String clusterAlias = randomBoolean() ? null : "remote";
SearchShardTarget searchShardTarget = new SearchShardTarget("", new ShardId("", "", shardIndex),
clusterAlias, OriginalIndices.NONE);
QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex, searchShardTarget);
QuerySearchResult querySearchResult = new QuerySearchResult(new SearchContextId("", shardIndex), searchShardTarget);
final TopDocs topDocs;
float maxScore = 0;
if (searchHitsSize == 0) {
@ -302,7 +304,7 @@ public class SearchPhaseControllerTests extends ESTestCase {
float maxScore = -1F;
String clusterAlias = randomBoolean() ? null : "remote";
SearchShardTarget shardTarget = new SearchShardTarget("", new ShardId("", "", shardIndex), clusterAlias, OriginalIndices.NONE);
FetchSearchResult fetchSearchResult = new FetchSearchResult(shardIndex, shardTarget);
FetchSearchResult fetchSearchResult = new FetchSearchResult(new SearchContextId("", shardIndex), shardTarget);
List<SearchHit> searchHits = new ArrayList<>();
for (ScoreDoc scoreDoc : mergedSearchDocs) {
if (scoreDoc.shardIndex == shardIndex) {
@ -365,8 +367,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
numEmptyResponses --;
}
QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new ShardId("a", "b", 0),
null, OriginalIndices.NONE));
QuerySearchResult result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), 0),
new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN),
new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 1.0D, DocValueFormat.RAW,
@ -375,7 +377,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
result.setShardIndex(0);
consumer.consumeResult(result);
result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE));
result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), 1),
new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN),
new DocValueFormat[0]);
aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 3.0D, DocValueFormat.RAW,
@ -384,7 +387,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
result.setShardIndex(2);
consumer.consumeResult(result);
result = new QuerySearchResult(1, new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE));
result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), 1),
new SearchShardTarget("node", new ShardId("a", "b", 0), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN),
new DocValueFormat[0]);
aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", 2.0D, DocValueFormat.RAW,
@ -452,8 +456,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
threads[i] = new Thread(() -> {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new ShardId("a", "b", id),
null, OriginalIndices.NONE));
QuerySearchResult result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), id),
new SearchShardTarget("node", new ShardId("a", "b", id), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] {new ScoreDoc(0, number)}), number),
new DocValueFormat[0]);
@ -496,8 +500,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
for (int i = 0; i < expectedNumResults; i++) {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i),
null, OriginalIndices.NONE));
QuerySearchResult result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), i),
new SearchShardTarget("node", new ShardId("a", "b", i), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), number),
new DocValueFormat[0]);
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(new InternalMax("test", (double) number,
@ -534,8 +538,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
for (int i = 0; i < expectedNumResults; i++) {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i),
null, OriginalIndices.NONE));
QuerySearchResult result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), i),
new SearchShardTarget("node", new ShardId("a", "b", i), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO),
new ScoreDoc[] {new ScoreDoc(0, number)}), number), new DocValueFormat[0]);
result.setShardIndex(i);
@ -601,8 +605,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
searchPhaseController.newSearchPhaseResults(NOOP, request, 4);
int score = 100;
for (int i = 0; i < 4; i++) {
QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i),
null, OriginalIndices.NONE));
QuerySearchResult result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), i),
new SearchShardTarget("node", new ShardId("a", "b", i), null, OriginalIndices.NONE));
ScoreDoc[] docs = new ScoreDoc[3];
for (int j = 0; j < docs.length; j++) {
docs[j] = new ScoreDoc(0, score--);
@ -643,8 +647,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
max.updateAndGet(prev -> Math.max(prev, number));
FieldDoc[] fieldDocs = {new FieldDoc(0, Float.NaN, new Object[]{number})};
TopDocs topDocs = new TopFieldDocs(new TotalHits(1, Relation.EQUAL_TO), fieldDocs, sortFields);
QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i),
null, OriginalIndices.NONE));
QuerySearchResult result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), i),
new SearchShardTarget("node", new ShardId("a", "b", i), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(topDocs, Float.NaN), docValueFormats);
result.setShardIndex(i);
result.size(size);
@ -681,8 +685,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
Object[] values = {randomFrom(collapseValues)};
FieldDoc[] fieldDocs = {new FieldDoc(0, Float.NaN, values)};
TopDocs topDocs = new CollapseTopFieldDocs("field", new TotalHits(1, Relation.EQUAL_TO), fieldDocs, sortFields, values);
QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i),
null, OriginalIndices.NONE));
QuerySearchResult result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), i),
new SearchShardTarget("node", new ShardId("a", "b", i), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(topDocs, Float.NaN), docValueFormats);
result.setShardIndex(i);
result.size(size);
@ -714,8 +718,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
int maxScorePhrase = -1;
int maxScoreCompletion = -1;
for (int i = 0; i < expectedNumResults; i++) {
QuerySearchResult result = new QuerySearchResult(i, new SearchShardTarget("node", new ShardId("a", "b", i),
null, OriginalIndices.NONE));
QuerySearchResult result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), i),
new SearchShardTarget("node", new ShardId("a", "b", i), null, OriginalIndices.NONE));
List<Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>> suggestions =
new ArrayList<>();
{
@ -840,8 +844,8 @@ public class SearchPhaseControllerTests extends ESTestCase {
threads[i] = new Thread(() -> {
int number = randomIntBetween(1, 1000);
max.updateAndGet(prev -> Math.max(prev, number));
QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new ShardId("a", "b", id),
null, OriginalIndices.NONE));
QuerySearchResult result = new QuerySearchResult(new SearchContextId(UUIDs.randomBase64UUID(), id),
new SearchShardTarget("node", new ShardId("a", "b", id), null, OriginalIndices.NONE));
result.topDocs(new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[]{new ScoreDoc(0, number)}), number),
new DocValueFormat[0]);

View File

@ -23,12 +23,14 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
@ -45,11 +47,11 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
public void testSendRequestsToNodes() throws InterruptedException {
ParsedScrollId scrollId = getParsedScrollId(
new ScrollIdForNode(null, "node1", 1),
new ScrollIdForNode(null, "node2", 2),
new ScrollIdForNode(null, "node3", 17),
new ScrollIdForNode(null, "node1", 0),
new ScrollIdForNode(null, "node3", 0));
new ScrollIdForNode(null, "node1", new SearchContextId(UUIDs.randomBase64UUID(), 1)),
new ScrollIdForNode(null, "node2", new SearchContextId(UUIDs.randomBase64UUID(), 2)),
new ScrollIdForNode(null, "node3", new SearchContextId(UUIDs.randomBase64UUID(), 17)),
new ScrollIdForNode(null, "node1", new SearchContextId(UUIDs.randomBase64UUID(), 0)),
new ScrollIdForNode(null, "node3", new SearchContextId(UUIDs.randomBase64UUID(), 0)));
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
.add(new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT))
@ -70,7 +72,7 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
{
new Thread(() -> {
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult =
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode());
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.contextId(), connection.getNode());
testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(),
new ShardId("test", "_na_", 1), null, OriginalIndices.NONE));
searchActionListener.onResponse(testSearchPhaseResult);
@ -106,7 +108,7 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
ScrollIdForNode[] context = scrollId.getContext();
for (int i = 0; i < results.length(); i++) {
assertNotNull(results.get(i));
assertEquals(context[i].getScrollId(), results.get(i).getRequestId());
assertEquals(context[i].getContextId(), results.get(i).getContextId());
assertEquals(context[i].getNode(), results.get(i).node.getId());
}
}
@ -114,11 +116,11 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
public void testFailNextPhase() throws InterruptedException {
ParsedScrollId scrollId = getParsedScrollId(
new ScrollIdForNode(null, "node1", 1),
new ScrollIdForNode(null, "node2", 2),
new ScrollIdForNode(null, "node3", 17),
new ScrollIdForNode(null, "node1", 0),
new ScrollIdForNode(null, "node3", 0));
new ScrollIdForNode(null, "node1", new SearchContextId("", 1)),
new ScrollIdForNode(null, "node2", new SearchContextId("a", 2)),
new ScrollIdForNode(null, "node3", new SearchContextId("b", 17)),
new ScrollIdForNode(null, "node1", new SearchContextId("c", 0)),
new ScrollIdForNode(null, "node3", new SearchContextId("d", 0)));
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
.add(new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT))
@ -161,7 +163,7 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
{
new Thread(() -> {
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult =
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode());
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.contextId(), connection.getNode());
testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(),
new ShardId("test", "_na_", 1), null, OriginalIndices.NONE));
searchActionListener.onResponse(testSearchPhaseResult);
@ -197,18 +199,18 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
ScrollIdForNode[] context = scrollId.getContext();
for (int i = 0; i < results.length(); i++) {
assertNotNull(results.get(i));
assertEquals(context[i].getScrollId(), results.get(i).getRequestId());
assertEquals(context[i].getContextId(), results.get(i).getContextId());
assertEquals(context[i].getNode(), results.get(i).node.getId());
}
}
public void testNodeNotAvailable() throws InterruptedException {
ParsedScrollId scrollId = getParsedScrollId(
new ScrollIdForNode(null, "node1", 1),
new ScrollIdForNode(null, "node2", 2),
new ScrollIdForNode(null, "node3", 17),
new ScrollIdForNode(null, "node1", 0),
new ScrollIdForNode(null, "node3", 0));
new ScrollIdForNode(null, "node1", new SearchContextId("", 1)),
new ScrollIdForNode(null, "node2", new SearchContextId("", 2)),
new ScrollIdForNode(null, "node3", new SearchContextId("", 17)),
new ScrollIdForNode(null, "node1", new SearchContextId("", 0)),
new ScrollIdForNode(null, "node3", new SearchContextId("", 0)));
// node2 is not available
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
@ -234,7 +236,7 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
}
new Thread(() -> {
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult =
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode());
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.contextId(), connection.getNode());
testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(),
new ShardId("test", "_na_", 1), null, OriginalIndices.NONE));
searchActionListener.onResponse(testSearchPhaseResult);
@ -275,7 +277,7 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
assertNull(results.get(i));
} else {
assertNotNull(results.get(i));
assertEquals(context[i].getScrollId(), results.get(i).getRequestId());
assertEquals(context[i].getContextId(), results.get(i).getContextId());
assertEquals(context[i].getNode(), results.get(i).node.getId());
}
}
@ -283,11 +285,11 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
public void testShardFailures() throws InterruptedException {
ParsedScrollId scrollId = getParsedScrollId(
new ScrollIdForNode(null, "node1", 1),
new ScrollIdForNode(null, "node2", 2),
new ScrollIdForNode(null, "node3", 17),
new ScrollIdForNode(null, "node1", 0),
new ScrollIdForNode(null, "node3", 0));
new ScrollIdForNode(null, "node1", new SearchContextId("", 1)),
new ScrollIdForNode(null, "node2", new SearchContextId("", 2)),
new ScrollIdForNode(null, "node3", new SearchContextId("",17)),
new ScrollIdForNode(null, "node1", new SearchContextId("", 0)),
new ScrollIdForNode(null, "node3", new SearchContextId("", 0)));
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
.add(new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT))
@ -307,11 +309,11 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
SearchActionListener<SearchAsyncActionTests.TestSearchPhaseResult> searchActionListener)
{
new Thread(() -> {
if (internalRequest.id() == 17) {
if (internalRequest.contextId().getId() == 17) {
searchActionListener.onFailure(new IllegalArgumentException("BOOM on shard"));
} else {
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult =
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode());
new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.contextId(), connection.getNode());
testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(),
new ShardId("test", "_na_", 1), null, OriginalIndices.NONE));
searchActionListener.onResponse(testSearchPhaseResult);
@ -349,11 +351,11 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
ScrollIdForNode[] context = scrollId.getContext();
for (int i = 0; i < results.length(); i++) {
if (context[i].getScrollId() == 17) {
if (context[i].getContextId().getId() == 17) {
assertNull(results.get(i));
} else {
assertNotNull(results.get(i));
assertEquals(context[i].getScrollId(), results.get(i).getRequestId());
assertEquals(context[i].getContextId(), results.get(i).getContextId());
assertEquals(context[i].getNode(), results.get(i).node.getId());
}
}
@ -361,11 +363,11 @@ public class SearchScrollAsyncActionTests extends ESTestCase {
public void testAllShardsFailed() throws InterruptedException {
ParsedScrollId scrollId = getParsedScrollId(
new ScrollIdForNode(null, "node1", 1),
new ScrollIdForNode(null, "node2", 2),
new ScrollIdForNode(null, "node3", 17),
new ScrollIdForNode(null, "node1", 0),
new ScrollIdForNode(null, "node3", 0));
new ScrollIdForNode(null, "node1", new SearchContextId("", 1)),
new ScrollIdForNode(null, "node2", new SearchContextId("", 2)),
new ScrollIdForNode(null, "node3", new SearchContextId("", 17)),
new ScrollIdForNode(null, "node1", new SearchContextId("", 0)),
new ScrollIdForNode(null, "node3", new SearchContextId("", 0)));
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
.add(new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT))

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.search;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
@ -32,6 +33,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
@ -57,12 +59,13 @@ public class SearchScrollRequestTests extends ESTestCase {
public void testInternalScrollSearchRequestSerialization() throws IOException {
SearchScrollRequest searchScrollRequest = createSearchScrollRequest();
InternalScrollSearchRequest internalScrollSearchRequest = new InternalScrollSearchRequest(searchScrollRequest, randomLong());
InternalScrollSearchRequest internalScrollSearchRequest =
new InternalScrollSearchRequest(searchScrollRequest, new SearchContextId(UUIDs.randomBase64UUID(), randomLong()));
try (BytesStreamOutput output = new BytesStreamOutput()) {
internalScrollSearchRequest.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
InternalScrollSearchRequest deserializedRequest = new InternalScrollSearchRequest(in);
assertEquals(deserializedRequest.id(), internalScrollSearchRequest.id());
assertEquals(deserializedRequest.contextId().getId(), internalScrollSearchRequest.contextId().getId());
assertEquals(deserializedRequest.scroll(), internalScrollSearchRequest.scroll());
assertNotSame(deserializedRequest, internalScrollSearchRequest);
}

View File

@ -24,10 +24,13 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
public class TransportSearchHelperTests extends ESTestCase {
public void testParseScrollId() throws IOException {
@ -35,29 +38,48 @@ public class TransportSearchHelperTests extends ESTestCase {
DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(1, node1);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 =
new SearchAsyncActionTests.TestSearchPhaseResult(new SearchContextId("x", 1), node1);
testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), "cluster_x", null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(12, node2);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 =
new SearchAsyncActionTests.TestSearchPhaseResult(new SearchContextId("y", 12), node2);
testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), "cluster_y", null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(42, node3);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 =
new SearchAsyncActionTests.TestSearchPhaseResult(new SearchContextId("z", 42), node3);
testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null));
array.setOnce(0, testSearchPhaseResult1);
array.setOnce(1, testSearchPhaseResult2);
array.setOnce(2, testSearchPhaseResult3);
String scrollId = TransportSearchHelper.buildScrollId(array);
boolean includeUUID = randomBoolean();
String scrollId = TransportSearchHelper.buildScrollId(array, includeUUID);
ParsedScrollId parseScrollId = TransportSearchHelper.parseScrollId(scrollId);
assertEquals(3, parseScrollId.getContext().length);
assertEquals("node_1", parseScrollId.getContext()[0].getNode());
assertEquals("cluster_x", parseScrollId.getContext()[0].getClusterAlias());
assertEquals(1, parseScrollId.getContext()[0].getScrollId());
assertEquals(1, parseScrollId.getContext()[0].getContextId().getId());
if (includeUUID) {
assertThat(parseScrollId.getContext()[0].getContextId().getReaderId(), equalTo("x"));
} else {
assertThat(parseScrollId.getContext()[0].getContextId().getReaderId(), equalTo(""));
}
assertEquals("node_2", parseScrollId.getContext()[1].getNode());
assertEquals("cluster_y", parseScrollId.getContext()[1].getClusterAlias());
assertEquals(12, parseScrollId.getContext()[1].getScrollId());
assertEquals(12, parseScrollId.getContext()[1].getContextId().getId());
if (includeUUID) {
assertThat(parseScrollId.getContext()[1].getContextId().getReaderId(), equalTo("y"));
} else {
assertThat(parseScrollId.getContext()[1].getContextId().getReaderId(), equalTo(""));
}
assertEquals("node_3", parseScrollId.getContext()[2].getNode());
assertNull(parseScrollId.getContext()[2].getClusterAlias());
assertEquals(42, parseScrollId.getContext()[2].getScrollId());
assertEquals(42, parseScrollId.getContext()[2].getContextId().getId());
if (includeUUID) {
assertThat(parseScrollId.getContext()[2].getContextId().getReaderId(), equalTo("z"));
} else {
assertThat(parseScrollId.getContext()[2].getContextId().getReaderId(), equalTo(""));
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
@ -51,6 +52,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.rescore.RescoreContext;
import org.elasticsearch.search.slice.SliceBuilder;
@ -119,8 +121,8 @@ public class DefaultSearchContextTests extends ESTestCase {
SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE);
DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, target, searcher, null, indexService,
indexShard, bigArrays, null, timeout, null, Version.CURRENT);
DefaultSearchContext context1 = new DefaultSearchContext(new SearchContextId(UUIDs.randomBase64UUID(), 1L),
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT);
context1.from(300);
// resultWindow greater than maxResultWindow and scrollContext is null
@ -160,8 +162,8 @@ public class DefaultSearchContextTests extends ESTestCase {
+ "] index level setting."));
// rescore is null but sliceBuilder is not null
DefaultSearchContext context2 = new DefaultSearchContext(2L, shardSearchRequest, target, searcher,
null, indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT);
DefaultSearchContext context2 = new DefaultSearchContext(new SearchContextId(UUIDs.randomBase64UUID(), 2L),
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT);
SliceBuilder sliceBuilder = mock(SliceBuilder.class);
int numSlices = maxSlicesPerScroll + randomIntBetween(1, 100);
@ -177,8 +179,8 @@ public class DefaultSearchContextTests extends ESTestCase {
when(shardSearchRequest.getAliasFilter()).thenReturn(AliasFilter.EMPTY);
when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST);
DefaultSearchContext context3 = new DefaultSearchContext(3L, shardSearchRequest, target, searcher, null,
indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT);
DefaultSearchContext context3 = new DefaultSearchContext(new SearchContextId(UUIDs.randomBase64UUID(), 3L),
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT);
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query()));
@ -187,8 +189,8 @@ public class DefaultSearchContextTests extends ESTestCase {
when(queryShardContext.fieldMapper(anyString())).thenReturn(mock(MappedFieldType.class));
when(shardSearchRequest.indexRoutings()).thenReturn(new String[0]);
DefaultSearchContext context4 = new DefaultSearchContext(4L, shardSearchRequest, target, searcher, null,
indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT);
DefaultSearchContext context4 = new DefaultSearchContext(new SearchContextId(UUIDs.randomBase64UUID(), 4L),
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, Version.CURRENT);
context4.sliceBuilder(new SliceBuilder(1,2)).parsedQuery(parsedQuery).preProcess(false);
Query query1 = context4.query();
context4.sliceBuilder(new SliceBuilder(0,2)).parsedQuery(parsedQuery).preProcess(false);

View File

@ -38,6 +38,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
@ -77,6 +78,7 @@ import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.suggest.SuggestBuilder;
@ -84,6 +86,7 @@ import org.elasticsearch.test.ESSingleNodeTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@ -321,12 +324,12 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
SearchPhaseResult searchPhaseResult = result.get();
IntArrayList intCursors = new IntArrayList(1);
intCursors.add(0);
ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getRequestId(), intCursors, null/* not a scroll */);
ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getContextId(), intCursors, null/* not a scroll */);
PlainActionFuture<FetchSearchResult> listener = new PlainActionFuture<>();
service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), listener);
listener.get();
if (useScroll) {
service.freeContext(searchPhaseResult.getRequestId());
service.freeContext(searchPhaseResult.getContextId());
}
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(RuntimeException.class));
@ -929,4 +932,40 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
searcher.join();
}
}
public void testLookUpSearchContext() throws Exception {
createIndex("index");
SearchService searchService = getInstanceFromNode(SearchService.class);
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
IndexShard indexShard = indexService.getShard(0);
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(
OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true),
indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null);
List<SearchContextId> contextIds = new ArrayList<>();
int numContexts = randomIntBetween(1, 10);
for (int i = 0; i < numContexts; i++) {
SearchService.SearchRewriteContext rewriteContext = searchService.acquireSearcherAndRewrite(shardSearchRequest, indexShard);
final SearchContext searchContext = searchService.createContext(rewriteContext);
assertThat(searchContext.id().getId(), equalTo((long) (i + 1)));
searchService.putContext(searchContext);
contextIds.add(searchContext.id());
}
assertThat(searchService.getActiveContexts(), equalTo(contextIds.size()));
while (contextIds.isEmpty() == false) {
final SearchContextId contextId = randomFrom(contextIds);
assertFalse(searchService.freeContext(new SearchContextId(UUIDs.randomBase64UUID(), contextId.getId())));
assertThat(searchService.getActiveContexts(), equalTo(contextIds.size()));
if (randomBoolean()) {
assertTrue(searchService.freeContext(contextId));
} else {
assertTrue(searchService.freeContext((new SearchContextId("", contextId.getId()))));
}
contextIds.remove(contextId);
assertThat(searchService.getActiveContexts(), equalTo(contextIds.size()));
assertFalse(searchService.freeContext(new SearchContextId("", contextId.getId())));
assertFalse(searchService.freeContext(contextId));
assertThat(searchService.getActiveContexts(), equalTo(contextIds.size()));
}
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalAggregationsTests;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.suggest.SuggestTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
@ -58,7 +59,8 @@ public class QuerySearchResultTests extends ESTestCase {
private static QuerySearchResult createTestInstance() throws Exception {
ShardId shardId = new ShardId("index", "uuid", randomInt());
QuerySearchResult result = new QuerySearchResult(randomLong(), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE));
QuerySearchResult result = new QuerySearchResult(new SearchContextId("", randomLong()),
new SearchShardTarget("node", shardId, null, OriginalIndices.NONE));
if (randomBoolean()) {
result.terminatedEarly(randomBoolean());
}
@ -79,7 +81,7 @@ public class QuerySearchResultTests extends ESTestCase {
QuerySearchResult querySearchResult = createTestInstance();
Version version = VersionUtils.randomVersion(random());
QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId());
assertEquals(querySearchResult.getContextId(), deserialized.getContextId());
assertNull(deserialized.getSearchShardTarget());
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);
assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits);
@ -122,7 +124,7 @@ public class QuerySearchResultTests extends ESTestCase {
try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry)) {
in.setVersion(Version.V_7_0_0);
QuerySearchResult querySearchResult = new QuerySearchResult(in);
assertEquals(100, querySearchResult.getRequestId());
assertEquals(100, querySearchResult.getContextId().getId());
assertTrue(querySearchResult.hasAggs());
InternalAggregations aggs = (InternalAggregations) querySearchResult.consumeAggs();
assertEquals(1, aggs.asList().size());

View File

@ -21,9 +21,11 @@ package org.elasticsearch.search.scroll;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
@ -34,6 +36,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
@ -41,10 +44,12 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.junit.After;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
@ -656,6 +661,42 @@ public class SearchScrollIT extends ESIntegTestCase {
}
}
public void testRestartDataNodesDuringScrollSearch() throws Exception {
final String dataNode = internalCluster().startDataOnlyNode();
createIndex("demo", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.include._name", dataNode)
.build());
createIndex("prod", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.include._name", dataNode)
.build());
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
index("demo", "_doc", "demo-" + i, Collections.emptyMap());
index("prod", "_doc", "prod-" + i, Collections.emptyMap());
}
client().admin().indices().prepareRefresh().get();
SearchResponse respFromDemoIndex = client().prepareSearch("demo")
.setSize(randomIntBetween(1, 10))
.setQuery(new MatchAllQueryBuilder()).setScroll(TimeValue.timeValueMinutes(5)).get();
internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback());
ensureGreen("demo", "prod");
SearchResponse respFromProdIndex = client().prepareSearch("prod")
.setSize(randomIntBetween(1, 10))
.setQuery(new MatchAllQueryBuilder()).setScroll(TimeValue.timeValueMinutes(5)).get();
assertNoFailures(respFromProdIndex);
SearchPhaseExecutionException error = expectThrows(SearchPhaseExecutionException.class,
() -> client().prepareSearchScroll(respFromDemoIndex.getScrollId()).get());
for (ShardSearchFailure shardSearchFailure : error.shardFailures()) {
assertThat(shardSearchFailure.getCause().getMessage(), containsString("No search context found for id [1]"));
}
client().prepareSearchScroll(respFromProdIndex.getScrollId()).get();
}
private void assertToXContentResponse(ClearScrollResponse response, boolean succeed, int numFreed) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);

View File

@ -52,6 +52,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.SearchContextHighlight;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.query.QuerySearchResult;
@ -132,8 +133,8 @@ public class TestSearchContext extends SearchContext {
}
@Override
public long id() {
return 0;
public SearchContextId id() {
return new SearchContextId("", 0);
}
@Override

View File

@ -11,6 +11,7 @@ import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.authc.Authentication;
@ -78,7 +79,7 @@ public final class SecuritySearchOperationListener implements SearchOperationLis
* (or lookup) realm. To work around this we compare the username and the originating realm type.
*/
static void ensureAuthenticatedUserIsSame(Authentication original, Authentication current, AuditTrailService auditTrailService,
long id, String action, TransportRequest request, String requestId,
SearchContextId id, String action, TransportRequest request, String requestId,
AuthorizationInfo authorizationInfo) {
// this is really a best effort attempt since we cannot guarantee principal uniqueness
// and realm names can change between nodes.

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.security.authz;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -15,6 +16,7 @@ import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TestSearchContext;
import org.elasticsearch.transport.TransportRequest;
@ -127,7 +129,7 @@ public class SecuritySearchOperationListenerTests extends ESTestCase {
final InternalScrollSearchRequest request = new InternalScrollSearchRequest();
SearchContextMissingException expected =
expectThrows(SearchContextMissingException.class, () -> listener.validateSearchContext(testSearchContext, request));
assertEquals(testSearchContext.id(), expected.id());
assertEquals(testSearchContext.id(), expected.contextId());
verify(licenseState, times(3)).isAuthAllowed();
verify(auditTrailService).accessDenied(eq(null), eq(authentication), eq("action"), eq(request),
authzInfoRoles(authentication.getUser().roles()));
@ -163,7 +165,7 @@ public class SecuritySearchOperationListenerTests extends ESTestCase {
final InternalScrollSearchRequest request = new InternalScrollSearchRequest();
SearchContextMissingException expected =
expectThrows(SearchContextMissingException.class, () -> listener.validateSearchContext(testSearchContext, request));
assertEquals(testSearchContext.id(), expected.id());
assertEquals(testSearchContext.id(), expected.contextId());
verify(licenseState, times(5)).isAuthAllowed();
verify(auditTrailService).accessDenied(eq(null), eq(authentication), eq("action"), eq(request),
authzInfoRoles(authentication.getUser().roles()));
@ -174,13 +176,13 @@ public class SecuritySearchOperationListenerTests extends ESTestCase {
Authentication original = new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null);
Authentication current =
randomBoolean() ? original : new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null);
long id = randomLong();
SearchContextId contextId = new SearchContextId(UUIDs.randomBase64UUID(), randomLong());
final String action = randomAlphaOfLength(4);
TransportRequest request = Empty.INSTANCE;
AuditTrailService auditTrail = mock(AuditTrailService.class);
final String auditId = randomAlphaOfLengthBetween(8, 20);
ensureAuthenticatedUserIsSame(original, current, auditTrail, id, action, request, auditId,
ensureAuthenticatedUserIsSame(original, current, auditTrail, contextId, action, request, auditId,
() -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, original.getUser().roles()));
verifyZeroInteractions(auditTrail);
@ -188,7 +190,7 @@ public class SecuritySearchOperationListenerTests extends ESTestCase {
User user = new User(new User("test", "role"), new User("authenticated", "runas"));
current = new Authentication(user, new RealmRef("realm", "file", "node"),
new RealmRef(randomAlphaOfLengthBetween(1, 16), "file", "node"));
ensureAuthenticatedUserIsSame(original, current, auditTrail, id, action, request, auditId,
ensureAuthenticatedUserIsSame(original, current, auditTrail, contextId, action, request, auditId,
() -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, original.getUser().roles()));
verifyZeroInteractions(auditTrail);
@ -196,7 +198,7 @@ public class SecuritySearchOperationListenerTests extends ESTestCase {
current = new Authentication(user, new RealmRef("realm", "file", "node"),
new RealmRef(randomAlphaOfLengthBetween(1, 16), "file", "node"));
Authentication runAs = current;
ensureAuthenticatedUserIsSame(runAs, current, auditTrail, id, action, request, auditId,
ensureAuthenticatedUserIsSame(runAs, current, auditTrail, contextId, action, request, auditId,
() -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, original.getUser().roles()));
verifyZeroInteractions(auditTrail);
@ -204,9 +206,9 @@ public class SecuritySearchOperationListenerTests extends ESTestCase {
Authentication differentRealmType =
new Authentication(new User("test", "role"), new RealmRef("realm", randomAlphaOfLength(5), "node"), null);
SearchContextMissingException e = expectThrows(SearchContextMissingException.class,
() -> ensureAuthenticatedUserIsSame(original, differentRealmType, auditTrail, id, action, request, auditId,
() -> ensureAuthenticatedUserIsSame(original, differentRealmType, auditTrail, contextId, action, request, auditId,
() -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, original.getUser().roles())));
assertEquals(id, e.id());
assertEquals(contextId, e.contextId());
verify(auditTrail).accessDenied(eq(auditId), eq(differentRealmType), eq(action), eq(request),
authzInfoRoles(original.getUser().roles()));
@ -214,9 +216,9 @@ public class SecuritySearchOperationListenerTests extends ESTestCase {
Authentication differentUser =
new Authentication(new User("test2", "role"), new RealmRef("realm", "realm", "node"), null);
e = expectThrows(SearchContextMissingException.class,
() -> ensureAuthenticatedUserIsSame(original, differentUser, auditTrail, id, action, request, auditId,
() -> ensureAuthenticatedUserIsSame(original, differentUser, auditTrail, contextId, action, request, auditId,
() -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, original.getUser().roles())));
assertEquals(id, e.id());
assertEquals(contextId, e.contextId());
verify(auditTrail).accessDenied(eq(auditId), eq(differentUser), eq(action), eq(request),
authzInfoRoles(original.getUser().roles()));
@ -224,18 +226,18 @@ public class SecuritySearchOperationListenerTests extends ESTestCase {
Authentication diffRunAs = new Authentication(new User(new User("test2", "role"), new User("authenticated", "runas")),
new RealmRef("realm", "file", "node1"), new RealmRef("realm", "file", "node1"));
e = expectThrows(SearchContextMissingException.class,
() -> ensureAuthenticatedUserIsSame(original, diffRunAs, auditTrail, id, action, request, auditId,
() -> ensureAuthenticatedUserIsSame(original, diffRunAs, auditTrail, contextId, action, request, auditId,
() -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, original.getUser().roles())));
assertEquals(id, e.id());
assertEquals(contextId, e.contextId());
verify(auditTrail).accessDenied(eq(auditId), eq(diffRunAs), eq(action), eq(request), authzInfoRoles(original.getUser().roles()));
// run as different looked up by type
Authentication runAsDiffType = new Authentication(user, new RealmRef("realm", "file", "node"),
new RealmRef(randomAlphaOfLengthBetween(1, 16), randomAlphaOfLengthBetween(5, 12), "node"));
e = expectThrows(SearchContextMissingException.class,
() -> ensureAuthenticatedUserIsSame(runAs, runAsDiffType, auditTrail, id, action, request, auditId,
() -> ensureAuthenticatedUserIsSame(runAs, runAsDiffType, auditTrail, contextId, action, request, auditId,
() -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, original.getUser().roles())));
assertEquals(id, e.id());
assertEquals(contextId, e.contextId());
verify(auditTrail).accessDenied(eq(auditId), eq(runAsDiffType), eq(action), eq(request),
authzInfoRoles(original.getUser().roles()));
}