handle nodes that are not connected early in AbstractSearchAsyncAction

This commit is contained in:
Simon Willnauer 2017-01-04 11:23:33 +01:00
parent 422cd1ef77
commit 31499a1248

View File

@ -41,6 +41,7 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import java.util.List;
@ -120,30 +121,33 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// TODO upgrade this to an assert...
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
final Transport.Connection connection = nodeIdToConnection.apply(shard.currentNodeId());
if (connection == null) {
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
try {
final Transport.Connection connection = nodeIdToConnection.apply(shard.currentNodeId());
AliasFilter filter = this.aliasFilter.get(shard.index().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST);
ShardSearchTransportRequest transportRequest = new ShardSearchTransportRequest(request, shardIt.shardId(), shardsIts.size(),
filter, indexBoost, startTime());
sendExecuteFirstPhase(connection, transportRequest , new ActionListener<FirstResult>() {
@Override
public void onResponse(FirstResult result) {
onFirstPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt);
}
sendExecuteFirstPhase(connection, transportRequest, new ActionListener<FirstResult>() {
@Override
public void onResponse(FirstResult result) {
onFirstPhaseResult(shardIndex, shard.currentNodeId(), result, shardIt);
}
@Override
public void onFailure(Exception t) {
onFirstPhaseResult(shardIndex, shard, connection.getNode().getId(), shardIt, t);
}
});
@Override
public void onFailure(Exception t) {
onFirstPhaseResult(shardIndex, shard, connection.getNode().getId(), shardIt, t);
}
});
} catch (ConnectTransportException | IllegalArgumentException ex) {
// we are getting the connection early here so we might run into nodes that are not connected. in that case we move on to
// the next shard.
onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, ex);
}
}
}