Always include the matching node when resolving point in time (#61658)

If shards are relocated to new nodes, then searches with a point in time
will fail, although a pit keeps search contexts open. This commit solves
this problem by reducing info used by SearchShardIterator and always
including the matching nodes when resolving a point in time.

Closes #61627
This commit is contained in:
Nhat Nguyen 2020-09-01 19:12:12 -04:00
parent 035f0638f4
commit 808c8689ac
19 changed files with 342 additions and 203 deletions

View File

@ -32,7 +32,6 @@ import org.elasticsearch.action.search.TransportSearchAction.SearchTimeProvider;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
@ -212,7 +211,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
successfulShardExecution(iterator);
}
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
* same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we
@ -221,16 +220,16 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
* we can continue (cf. InitialSearchPhase#maybeFork).
*/
if (shard == null) {
fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
fork(() -> onShardFailure(shardIndex, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
} else {
final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
: null;
Runnable r = () -> {
final Thread thread = Thread.currentThread();
try {
executePhaseOnShard(shardIt, shard,
new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {
new SearchActionListener<Result>(shard, shardIndex) {
@Override
public void innerOnResponse(Result result) {
try {
@ -243,7 +242,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
@Override
public void onFailure(Exception t) {
try {
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
onShardFailure(shardIndex, shard, shardIt, t);
} finally {
executeNext(pendingExecutions, thread);
}
@ -255,7 +254,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
* It is possible to run into connection exceptions here because we are getting the connection early and might
* run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
fork(() -> onShardFailure(shardIndex, shard, shardIt, e));
} finally {
executeNext(pendingExecutions, thread);
}
@ -275,7 +274,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
* @param shard the shard routing to send the request for
* @param listener the listener to notify on response
*/
protected abstract void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, SearchActionListener<Result> listener);
protected abstract void executePhaseOnShard(SearchShardIterator shardIt,
SearchShardTarget shard,
SearchActionListener<Result> listener);
private void fork(final Runnable runnable) {
executor.execute(new AbstractRunnable() {
@ -370,18 +371,16 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
return failures;
}
private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
final SearchShardIterator shardIt, Exception e) {
private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId);
onShardFailure(shardIndex, shardTarget, e);
final ShardRouting nextShard = shardIt.nextOrNull();
onShardFailure(shardIndex, shard, e);
final SearchShardTarget nextShard = shardIt.nextOrNull();
final boolean lastShard = nextShard == null;
logger.debug(() -> new ParameterizedMessage("{}: Failed to execute [{}] lastShard [{}]",
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
shard != null ? shard : shardIt.shardId(), request, lastShard), e);
if (lastShard) {
onShardGroupFailure(shardIndex, shardTarget, e);
onShardGroupFailure(shardIndex, shard, e);
}
final int totalOps = this.totalOps.incrementAndGet();
if (totalOps == expectedTotalOps) {

View File

@ -23,8 +23,8 @@ import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchService.CanMatchResponse;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.sort.FieldSortBuilder;
@ -77,9 +77,9 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
}
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard,
SearchActionListener<CanMatchResponse> listener) {
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
getSearchTransport().sendCanMatch(getConnection(shard.getClusterAlias(), shard.getNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}

View File

@ -23,7 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
@ -65,9 +65,9 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
}
@Override
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
protected void executePhaseOnShard(final SearchShardIterator shardIt, final SearchShardTarget shard,
final SearchActionListener<DfsSearchResult> listener) {
getSearchTransport().sendExecuteDfs(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
getSearchTransport().sendExecuteDfs(getConnection(shard.getClusterAlias(), shard.getNodeId()),
buildShardSearchRequest(shardIt) , getTask(), listener);
}

View File

@ -24,7 +24,6 @@ import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -75,11 +74,11 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
SearchProgressListener.buildSearchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
protected void executePhaseOnShard(final SearchShardIterator shardIt,
final SearchShardTarget shard,
final SearchActionListener<SearchPhaseResult> listener) {
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));
getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
request, getTask(), listener);
getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);
}
@Override

View File

@ -21,16 +21,19 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Countable;
import org.elasticsearch.common.util.PlainIterator;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.ShardSearchContextId;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
@ -38,32 +41,36 @@ import java.util.Objects;
* the cluster alias.
* @see OriginalIndices
*/
public final class SearchShardIterator extends PlainShardIterator {
public final class SearchShardIterator implements Comparable<SearchShardIterator>, Countable {
private final OriginalIndices originalIndices;
private final String clusterAlias;
private final ShardId shardId;
private boolean skip = false;
private final ShardSearchContextId searchContextId;
private final TimeValue searchContextKeepAlive;
private final PlainIterator<String> targetNodesIterator;
/**
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
* this the a given <code>shardId</code>.
*
* @param clusterAlias the alias of the cluster where the shard is located
* @param shardId shard id of the group
* @param shards shards to iterate
* @param clusterAlias the alias of the cluster where the shard is located
* @param shardId shard id of the group
* @param shards shards to iterate
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
*/
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
this(clusterAlias, shardId, shards, originalIndices, null, null);
this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).collect(Collectors.toList()),
originalIndices, null, null);
}
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId,
List<ShardRouting> shards, OriginalIndices originalIndices,
List<String> targetNodeIds, OriginalIndices originalIndices,
ShardSearchContextId searchContextId, TimeValue searchContextKeepAlive) {
super(shardId, shards);
this.shardId = shardId;
this.targetNodesIterator = new PlainIterator<>(targetNodeIds);
this.originalIndices = originalIndices;
this.clusterAlias = clusterAlias;
this.searchContextId = searchContextId;
@ -86,12 +93,16 @@ public final class SearchShardIterator extends PlainShardIterator {
return clusterAlias;
}
/**
* Creates a new shard target from this iterator, pointing at the node identified by the provided identifier.
* @see SearchShardTarget
*/
SearchShardTarget newSearchShardTarget(String nodeId) {
return new SearchShardTarget(nodeId, shardId(), clusterAlias, originalIndices);
SearchShardTarget nextOrNull() {
final String nodeId = targetNodesIterator.nextOrNull();
if (nodeId != null) {
return new SearchShardTarget(nodeId, shardId, clusterAlias, originalIndices);
}
return null;
}
int remaining() {
return targetNodesIterator.remaining();
}
/**
@ -105,6 +116,10 @@ public final class SearchShardIterator extends PlainShardIterator {
return searchContextKeepAlive;
}
List<String> getTargetNodeIds() {
return targetNodesIterator.asList();
}
/**
* Reset the iterator and mark it as skippable
* @see #skip()
@ -114,6 +129,10 @@ public final class SearchShardIterator extends PlainShardIterator {
skip = true;
}
void reset() {
targetNodesIterator.reset();
}
/**
* Returns <code>true</code> if the search execution should skip this shard since it can not match any documents given the query.
*/
@ -121,42 +140,33 @@ public final class SearchShardIterator extends PlainShardIterator {
return skip;
}
@Override
public int size() {
return targetNodesIterator.size();
}
ShardId shardId() {
return shardId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (super.equals(o) == false) {
return false;
}
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchShardIterator that = (SearchShardIterator) o;
return Objects.equals(clusterAlias, that.clusterAlias);
return shardId.equals(that.shardId) && Objects.equals(clusterAlias, that.clusterAlias);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), clusterAlias);
return Objects.hash(clusterAlias, shardId);
}
@Override
public int compareTo(ShardIterator o) {
int superCompareTo = super.compareTo(o);
if (superCompareTo != 0 || (o instanceof SearchShardIterator == false)) {
return superCompareTo;
}
SearchShardIterator searchShardIterator = (SearchShardIterator)o;
if (clusterAlias == null && searchShardIterator.getClusterAlias() == null) {
return 0;
}
if (clusterAlias == null) {
return -1;
}
if (searchShardIterator.getClusterAlias() == null) {
return 1;
}
return clusterAlias.compareTo(searchShardIterator.getClusterAlias());
public int compareTo(SearchShardIterator o) {
return Comparator.comparing(SearchShardIterator::shardId)
.thenComparing(SearchShardIterator::getClusterAlias, Comparator.nullsFirst(String::compareTo))
.compare(this, o);
}
}

View File

@ -245,11 +245,10 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
indexRoutings, executor, searchRequest, listener, shardsIts, timeProvider, clusterState, task,
new ArraySearchPhaseResults<>(shardsIts.size()), 1, clusters) {
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard,
SearchActionListener<SearchPhaseResult> listener) {
final Transport.Connection connection = getConnection(shardIt.getClusterAlias(), shard.currentNodeId());
final SearchShardTarget searchShardTarget = shardIt.newSearchShardTarget(shard.currentNodeId());
phaseSearchAction.executeOnShardTarget(task, searchShardTarget, connection, listener);
final Transport.Connection connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
phaseSearchAction.executeOnShardTarget(task, shard, connection, listener);
}
@Override
@ -597,7 +596,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
.map(it -> new SearchShardIterator(
searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices, null, null))
searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
.collect(Collectors.toList());
aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
indexRoutings = routingMap;
@ -875,7 +874,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OriginalIndices(e.getValue().toArray(new String[0]), indicesOptions)));
}
static List<SearchShardIterator> getSearchShardsFromSearchContexts(ClusterState clusterState, OriginalIndices originalIndices,
static List<SearchShardIterator> getSearchShardsFromSearchContexts(ClusterState clusterState,
OriginalIndices originalIndices,
String localClusterAlias,
SearchContextId searchContext,
TimeValue keepAlive) {
@ -883,15 +883,17 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) {
final ShardId shardId = entry.getKey();
final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
final List<ShardRouting> matchingNodeFirstRoutings = new ArrayList<>();
final List<String> matchingNodeFirst = new ArrayList<>(shards.size());
final String nodeId = entry.getValue().getNode();
// always search the matching node first even when its shard was relocated to another node
// because the point in time should keep the corresponding search context open.
matchingNodeFirst.add(nodeId);
for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(entry.getValue().getNode())) {
matchingNodeFirstRoutings.add(0, shard);
} else {
matchingNodeFirstRoutings.add(shard);
if (shard.currentNodeId().equals(nodeId) == false) {
matchingNodeFirst.add(shard.currentNodeId());
}
}
iterators.add(new SearchShardIterator(localClusterAlias, shardId, matchingNodeFirstRoutings, originalIndices,
iterators.add(new SearchShardIterator(localClusterAlias, shardId, matchingNodeFirst, originalIndices,
entry.getValue().getSearchContextId(), keepAlive));
}
return iterators;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.util.Countable;
import java.util.Iterator;
import java.util.List;
@ -30,16 +31,15 @@ import java.util.List;
* ShardsIterators are always returned in ascending order independently of their order at construction
* time. The incoming iterators are sorted to ensure consistent iteration behavior across Nodes / JVMs.
*/
public final class GroupShardsIterator<ShardIt extends ShardIterator> implements Iterable<ShardIt> {
public final class GroupShardsIterator<ShardIt extends Comparable<ShardIt> & Countable> implements Iterable<ShardIt> {
private final List<ShardIt> iterators;
/**
* Constructs a new sorted GroupShardsIterator from the given list. Items are sorted based on their natural ordering.
* @see PlainShardIterator#compareTo(ShardIterator)
* @see org.elasticsearch.action.search.SearchShardIterator#compareTo(ShardIterator)
*/
public static <ShardIt extends ShardIterator> GroupShardsIterator<ShardIt> sortAndCreate(List<ShardIt> iterators) {
public static <ShardIt extends Comparable<ShardIt> & Countable> GroupShardsIterator<ShardIt> sortAndCreate(List<ShardIt> iterators) {
CollectionUtil.timSort(iterators);
return new GroupShardsIterator<>(iterators);
}
@ -56,11 +56,7 @@ public final class GroupShardsIterator<ShardIt extends ShardIterator> implements
* @return total number of shards
*/
public int totalSize() {
int size = 0;
for (ShardIterator shard : iterators) {
size += shard.size();
}
return size;
return iterators.stream().mapToInt(Countable::size).sum();
}
/**

View File

@ -18,70 +18,26 @@
*/
package org.elasticsearch.cluster.routing;
import java.util.Collections;
import java.util.Iterator;
import org.elasticsearch.common.util.PlainIterator;
import java.util.List;
/**
* A simple {@link ShardsIterator} that iterates a list or sub-list of
* {@link ShardRouting shard indexRoutings}.
*/
public class PlainShardsIterator implements ShardsIterator {
private final List<ShardRouting> shards;
// Calls to nextOrNull might be performed on different threads in the transport actions so we need the volatile
// keyword in order to ensure visibility. Note that it is fine to use `volatile` for a counter in that case given
// that although nextOrNull might be called from different threads, it can never happen concurrently.
private volatile int index;
public class PlainShardsIterator extends PlainIterator<ShardRouting> implements ShardsIterator {
public PlainShardsIterator(List<ShardRouting> shards) {
this.shards = shards;
reset();
}
@Override
public void reset() {
index = 0;
}
@Override
public int remaining() {
return shards.size() - index;
}
@Override
public ShardRouting nextOrNull() {
if (index == shards.size()) {
return null;
} else {
return shards.get(index++);
}
}
@Override
public int size() {
return shards.size();
super(shards);
}
@Override
public int sizeActive() {
int count = 0;
for (ShardRouting shard : shards) {
if (shard.active()) {
count++;
}
}
return count;
return Math.toIntExact(getShardRoutings().stream().filter(ShardRouting::active).count());
}
@Override
public List<ShardRouting> getShardRoutings() {
return Collections.unmodifiableList(shards);
}
@Override
public Iterator<ShardRouting> iterator() {
return shards.iterator();
return asList();
}
}

View File

@ -18,12 +18,14 @@
*/
package org.elasticsearch.cluster.routing;
import org.elasticsearch.common.util.Countable;
import java.util.List;
/**
* Allows to iterate over unrelated shards.
*/
public interface ShardsIterator extends Iterable<ShardRouting> {
public interface ShardsIterator extends Iterable<ShardRouting>, Countable {
/**
* Resets the iterator to its initial state.

View File

@ -0,0 +1,24 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
public interface Countable {
int size();
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public class PlainIterator<T> implements Iterable<T>, Countable {
private final List<T> elements;
// Calls to nextOrNull might be performed on different threads in the transport actions so we need the volatile
// keyword in order to ensure visibility. Note that it is fine to use `volatile` for a counter in that case given
// that although nextOrNull might be called from different threads, it can never happen concurrently.
private volatile int index;
public PlainIterator(List<T> elements) {
this.elements = elements;
reset();
}
public void reset() {
index = 0;
}
public int remaining() {
return elements.size() - index;
}
public T nextOrNull() {
if (index == elements.size()) {
return null;
} else {
return elements.get(index++);
}
}
@Override
public int size() {
return elements.size();
}
public List<T> asList() {
return Collections.unmodifiableList(elements);
}
@Override
public Iterator<T> iterator() {
return elements.iterator();
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.set.Sets;
@ -102,7 +101,7 @@ public class AbstractSearchAsyncActionTests extends ESTestCase {
}
@Override
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
protected void executePhaseOnShard(final SearchShardIterator shardIt, final SearchShardTarget shard,
final SearchActionListener<SearchPhaseResult> listener) {
}

View File

@ -25,12 +25,12 @@ import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchRequest;
@ -266,7 +266,7 @@ public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
@Override
protected void executePhaseOnShard(
final SearchShardIterator shardIt,
final ShardRouting shard,
final SearchShardTarget shard,
final SearchActionListener<SearchPhaseResult> listener) {
if (randomBoolean()) {
listener.onResponse(new SearchPhaseResult() {});

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchContextId;
@ -117,15 +118,15 @@ public class SearchAsyncActionTests extends ESTestCase {
SearchResponse.Clusters.EMPTY) {
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard,
SearchActionListener<TestSearchPhaseResult> listener) {
seenShard.computeIfAbsent(shard.shardId(), (i) -> {
seenShard.computeIfAbsent(shard.getShardId(), (i) -> {
numRequests.incrementAndGet(); // only count this once per replica
return Boolean.TRUE;
});
new Thread(() -> {
Transport.Connection connection = getConnection(null, shard.currentNodeId());
Transport.Connection connection = getConnection(null, shard.getNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(
new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()),
connection.getNode());
@ -223,9 +224,9 @@ public class SearchAsyncActionTests extends ESTestCase {
SearchResponse.Clusters.EMPTY) {
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard,
SearchActionListener<TestSearchPhaseResult> listener) {
seenShard.computeIfAbsent(shard.shardId(), (i) -> {
seenShard.computeIfAbsent(shard.getShardId(), (i) -> {
numRequests.incrementAndGet(); // only count this once per shard copy
return Boolean.TRUE;
});
@ -236,10 +237,10 @@ public class SearchAsyncActionTests extends ESTestCase {
} catch (InterruptedException e) {
throw new AssertionError(e);
}
Transport.Connection connection = getConnection(null, shard.currentNodeId());
Transport.Connection connection = getConnection(null, shard.getNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(
new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode());
if (shardFailures[shard.shardId().id()]) {
if (shardFailures[shard.getShardId().id()]) {
listener.onFailure(new RuntimeException());
} else {
listener.onResponse(testSearchPhaseResult);
@ -327,10 +328,10 @@ public class SearchAsyncActionTests extends ESTestCase {
TestSearchResponse response = new TestSearchResponse();
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, SearchActionListener<TestSearchPhaseResult>
listener) {
assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId()));
Transport.Connection connection = getConnection(null, shard.currentNodeId());
protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard,
SearchActionListener<TestSearchPhaseResult> listener) {
assertTrue("shard: " + shard.getShardId() + " has been queried twice", response.queried.add(shard.getShardId()));
Transport.Connection connection = getConnection(null, shard.getNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(
new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode());
Set<ShardSearchContextId> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> newConcurrentSet());
@ -438,12 +439,12 @@ public class SearchAsyncActionTests extends ESTestCase {
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt,
ShardRouting shard,
SearchShardTarget shard,
SearchActionListener<TestSearchPhaseResult> listener) {
assertTrue("shard: " + shard.shardId() + " has been queried twice", response.queried.add(shard.shardId()));
Transport.Connection connection = getConnection(null, shard.currentNodeId());
assertTrue("shard: " + shard.getShardId() + " has been queried twice", response.queried.add(shard.getShardId()));
Transport.Connection connection = getConnection(null, shard.getNodeId());
final TestSearchPhaseResult testSearchPhaseResult;
if (shard.shardId().id() == 0) {
if (shard.getShardId().id() == 0) {
testSearchPhaseResult = new TestSearchPhaseResult(null, connection.getNode());
} else {
testSearchPhaseResult = new TestSearchPhaseResult(new ShardSearchContextId(UUIDs.randomBase64UUID(),
@ -538,14 +539,14 @@ public class SearchAsyncActionTests extends ESTestCase {
SearchResponse.Clusters.EMPTY) {
@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard,
SearchActionListener<TestSearchPhaseResult> listener) {
seenShard.computeIfAbsent(shard.shardId(), (i) -> {
seenShard.computeIfAbsent(shard.getShardId(), (i) -> {
numRequests.incrementAndGet(); // only count this once per shard copy
return Boolean.TRUE;
});
new Thread(() -> {
Transport.Connection connection = getConnection(null, shard.currentNodeId());
Transport.Connection connection = getConnection(null, shard.getNodeId());
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(
new ShardSearchContextId(UUIDs.randomBase64UUID(), contextIdGenerator.incrementAndGet()), connection.getNode());
if (shardIt.remaining() > 0) {

View File

@ -34,6 +34,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
public class SearchShardIteratorTests extends ESTestCase {
public void testShardId() {
@ -63,9 +65,13 @@ public class SearchShardIteratorTests extends ESTestCase {
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
OriginalIndices originalIndices = new OriginalIndices(new String[]{randomAlphaOfLengthBetween(3, 10)},
IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()));
SearchShardIterator searchShardIterator = new SearchShardIterator(clusterAlias, shardId, Collections.emptyList(), originalIndices);
String nodeId = randomAlphaOfLengthBetween(3, 10);
SearchShardTarget searchShardTarget = searchShardIterator.newSearchShardTarget(nodeId);
SearchShardIterator searchShardIterator = new SearchShardIterator(clusterAlias, shardId,
Collections.singletonList(nodeId),originalIndices, null, null);
final SearchShardTarget searchShardTarget = searchShardIterator.nextOrNull();
assertNotNull(searchShardTarget);
assertThat(searchShardTarget.getNodeId(), equalTo(nodeId));
assertEquals(clusterAlias, searchShardTarget.getClusterAlias());
assertSame(shardId, searchShardTarget.getShardId());
assertEquals(nodeId, searchShardTarget.getNodeId());
@ -74,7 +80,7 @@ public class SearchShardIteratorTests extends ESTestCase {
public void testEqualsAndHashcode() {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(randomSearchShardIterator(), s -> new SearchShardIterator(s.getClusterAlias(),
s.shardId(), s.getShardRoutings(), s.getOriginalIndices()), s -> {
s.shardId(), s.getTargetNodeIds(), s.getOriginalIndices(), s.getSearchContextId(), s.getSearchContextKeepAlive()), s -> {
if (randomBoolean()) {
String clusterAlias;
if (s.getClusterAlias() == null) {
@ -82,11 +88,13 @@ public class SearchShardIteratorTests extends ESTestCase {
} else {
clusterAlias = randomBoolean() ? null : s.getClusterAlias() + randomAlphaOfLength(3);
}
return new SearchShardIterator(clusterAlias, s.shardId(), s.getShardRoutings(), s.getOriginalIndices());
return new SearchShardIterator(clusterAlias, s.shardId(), s.getTargetNodeIds(), s.getOriginalIndices(),
s.getSearchContextId(), s.getSearchContextKeepAlive());
} else {
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10),
randomIntBetween(0, Integer.MAX_VALUE));
return new SearchShardIterator(s.getClusterAlias(), shardId, s.getShardRoutings(), s.getOriginalIndices());
return new SearchShardIterator(s.getClusterAlias(), shardId, s.getTargetNodeIds(), s.getOriginalIndices(),
s.getSearchContextId(), s.getSearchContextKeepAlive());
}
});
}
@ -134,7 +142,8 @@ public class SearchShardIteratorTests extends ESTestCase {
public void testCompareToEqualItems() {
SearchShardIterator shardIterator1 = randomSearchShardIterator();
SearchShardIterator shardIterator2 = new SearchShardIterator(shardIterator1.getClusterAlias(), shardIterator1.shardId(),
shardIterator1.getShardRoutings(), shardIterator1.getOriginalIndices());
shardIterator1.getTargetNodeIds(), shardIterator1.getOriginalIndices(), shardIterator1.getSearchContextId(),
shardIterator1.getSearchContextKeepAlive());
assertEquals(shardIterator1, shardIterator2);
assertEquals(0, shardIterator1.compareTo(shardIterator2));
assertEquals(0, shardIterator2.compareTo(shardIterator1));

View File

@ -53,6 +53,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -256,33 +257,33 @@ public class TransportSearchActionTests extends ESTestCase {
assertTrue(iterator.shardId().getId() == 0 || iterator.shardId().getId() == 1);
assertEquals("test_cluster_1", iterator.getClusterAlias());
assertEquals("foo", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "foo");
shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "foo");
SearchShardTarget shard = iterator.nextOrNull();
assertNotNull(shard);
assertEquals(shard.getShardId().getIndexName(), "foo");
shard = iterator.nextOrNull();
assertNotNull(shard);
assertEquals(shard.getShardId().getIndexName(), "foo");
assertNull(iterator.nextOrNull());
} else if (iterator.shardId().getIndexName().endsWith("bar")) {
assertArrayEquals(new String[]{"bar"}, iterator.getOriginalIndices().indices());
assertEquals(0, iterator.shardId().getId());
assertEquals("test_cluster_1", iterator.getClusterAlias());
assertEquals("bar", iterator.shardId().getIndexName());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "bar");
shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "bar");
SearchShardTarget shard = iterator.nextOrNull();
assertNotNull(shard);
assertEquals(shard.getShardId().getIndexName(), "bar");
shard = iterator.nextOrNull();
assertNotNull(shard);
assertEquals(shard.getShardId().getIndexName(), "bar");
assertNull(iterator.nextOrNull());
} else if (iterator.shardId().getIndexName().endsWith("xyz")) {
assertArrayEquals(new String[]{"some_alias_for_xyz"}, iterator.getOriginalIndices().indices());
assertEquals(0, iterator.shardId().getId());
assertEquals("xyz", iterator.shardId().getIndexName());
assertEquals("test_cluster_2", iterator.getClusterAlias());
ShardRouting shardRouting = iterator.nextOrNull();
assertNotNull(shardRouting);
assertEquals(shardRouting.getIndexName(), "xyz");
SearchShardTarget shard = iterator.nextOrNull();
assertNotNull(shard);
assertEquals(shard.getShardId().getIndexName(), "xyz");
assertNull(iterator.nextOrNull());
}
}

View File

@ -155,7 +155,7 @@ public class GroupShardsIteratorTests extends ESTestCase {
List<SearchShardIterator> shuffled = new ArrayList<>(sorted);
Collections.shuffle(shuffled, random());
{
List<ShardIterator> actualIterators = new ArrayList<>();
List<SearchShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator<SearchShardIterator> iter = new GroupShardsIterator<>(shuffled);
for (SearchShardIterator searchShardIterator : iter) {
actualIterators.add(searchShardIterator);
@ -163,7 +163,7 @@ public class GroupShardsIteratorTests extends ESTestCase {
assertEquals(shuffled, actualIterators);
}
{
List<ShardIterator> actualIterators = new ArrayList<>();
List<SearchShardIterator> actualIterators = new ArrayList<>();
GroupShardsIterator<SearchShardIterator> iter = GroupShardsIterator.sortAndCreate(shuffled);
for (SearchShardIterator searchShardIterator : iter) {
actualIterators.add(searchShardIterator);

View File

@ -32,12 +32,12 @@ import org.apache.lucene.store.Directory;
import org.elasticsearch.Version;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchShardIterator;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
@ -356,7 +356,7 @@ public class SliceBuilderTests extends ESTestCase {
OperationRouting routing = mock(OperationRouting.class);
GroupShardsIterator<ShardIterator> it = new GroupShardsIterator<>(
Collections.singletonList(
new SearchShardIterator(null, new ShardId("index", "index", 1), null, null)
new PlainShardIterator(new ShardId("index", "index", 1), Collections.emptyList())
)
);
when(routing.searchShards(any(), any(), any(), any())).thenReturn(it);

View File

@ -19,6 +19,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
@ -38,14 +39,20 @@ import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
public class PointInTimeIT extends ESIntegTestCase {
@ -129,30 +136,95 @@ public class PointInTimeIT extends ESIntegTestCase {
client().prepareIndex(index, "_doc").setId(id).setSource("value", i).get();
}
refresh();
String readerId = openPointInTime(new String[] { "*" }, TimeValue.timeValueMinutes(2));
SearchResponse resp1 = client().prepareSearch().setPreference(null).setSearchContext(readerId, TimeValue.timeValueMinutes(2)).get();
assertNoFailures(resp1);
assertHitCount(resp1, numDocs);
int moreDocs = randomIntBetween(10, 50);
for (int i = 0; i < moreDocs; i++) {
String id = "more-" + i;
String index = "index-" + randomIntBetween(1, numIndices);
client().prepareIndex(index, "_doc").setId(id).setSource("value", i).get();
String pitId = openPointInTime(new String[]{"*"}, TimeValue.timeValueMinutes(2));
try {
SearchResponse resp = client().prepareSearch()
.setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2))
.get();
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
pitId = resp.pointInTimeId();
int moreDocs = randomIntBetween(10, 50);
for (int i = 0; i < moreDocs; i++) {
String id = "more-" + i;
String index = "index-" + randomIntBetween(1, numIndices);
client().prepareIndex(index, "_doc").setId(id).setSource("value", i).get();
}
refresh();
resp = client().prepareSearch().get();
assertNoFailures(resp);
assertHitCount(resp, numDocs + moreDocs);
resp = client().prepareSearch().setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(1)).get();
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
pitId = resp.pointInTimeId();
} finally {
closePointInTime(pitId);
}
}
public void testRelocation() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(4);
createIndex("test", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1)).build());
ensureGreen("test");
int numDocs = randomIntBetween(10, 50);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test", "_doc").setId(Integer.toString(i)).setSource("value", i).get();
}
refresh();
String pitId = openPointInTime(new String[]{"test"}, TimeValue.timeValueMinutes(2));
try {
SearchResponse resp2 = client().prepareSearch().get();
assertNoFailures(resp2);
assertHitCount(resp2, numDocs + moreDocs);
SearchResponse resp3 = client().prepareSearch()
.setPreference(null)
.setSearchContext(resp1.pointInTimeId(), TimeValue.timeValueMinutes(1))
SearchResponse resp = client().prepareSearch()
.setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2))
.get();
assertNoFailures(resp3);
assertHitCount(resp3, numDocs);
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
if (randomBoolean()) {
pitId = resp.pointInTimeId();
}
final Set<String> dataNodes = StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false)
.map(e -> e.value.getId()).collect(Collectors.toSet());
final List<String> excludedNodes = randomSubsetOf(2, dataNodes);
assertAcked(client().admin().indices().prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.exclude._id", String.join(",", excludedNodes)).build()));
if (randomBoolean()) {
int moreDocs = randomIntBetween(10, 50);
for (int i = 0; i < moreDocs; i++) {
client().prepareIndex("test", "_doc").setId("more-" + i).setSource("value", i).get();
}
refresh();
}
resp = client().prepareSearch()
.setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2))
.get();
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
if (randomBoolean()) {
pitId = resp.pointInTimeId();
}
assertBusy(() -> {
final Set<String> assignedNodes = clusterService().state().routingTable().allShards().stream()
.filter(shr -> shr.index().getName().equals("test") && shr.assignedToNode())
.map(ShardRouting::currentNodeId)
.collect(Collectors.toSet());
assertThat(assignedNodes, everyItem(not(in(excludedNodes))));
}, 30, TimeUnit.SECONDS);
resp = client().prepareSearch()
.setPreference(null).setSearchContext(pitId, TimeValue.timeValueMinutes(2))
.get();
assertNoFailures(resp);
assertHitCount(resp, numDocs);
assertNotNull(resp.pointInTimeId());
if (randomBoolean()) {
pitId = resp.pointInTimeId();
}
} finally {
closePointInTime(resp1.pointInTimeId());
closePointInTime(pitId);
}
}