Search on a shard group while relocation final flip happens might fail

At the final stage of a relocation, during the final flip of the states, a search request might hit a node that would then execute it on a shard that has already relocated.

For this, we need to execute broadcast and search operations against initializing shards as well, but only as a last resort. The operation will be rejected if not applicable (i.e. IndexShard#searcher() checked for read allowed).

Note, this requires careful though about which failures we send back. If we try and initializing shard and it fails, its failure should not override an actual failure of an active shard.

Also, removed an atomic integer used in broadcast request and use a similar shard index trick we now have in our search execution.

closes #3427
This commit is contained in:
Shay Banon 2013-08-01 18:35:58 +02:00
parent 60bddc28eb
commit f3d3a8bd58
13 changed files with 345 additions and 196 deletions

View File

@ -21,12 +21,17 @@ package org.elasticsearch.action;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
/**
*
*/
public class NoShardAvailableActionException extends IndexShardException {
public NoShardAvailableActionException(ShardId shardId) {
super(shardId, null);
}
public NoShardAvailableActionException(ShardId shardId, String msg) {
super(shardId, msg);
}
@ -35,4 +40,8 @@ public class NoShardAvailableActionException extends IndexShardException {
super(shardId, msg, cause);
}
@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.search;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.rest.RestStatus;
/**
*
@ -42,6 +43,23 @@ public class SearchPhaseExecutionException extends ElasticSearchException {
this.shardFailures = shardFailures;
}
@Override
public RestStatus status() {
if (shardFailures.length == 0) {
// if no successful shards, it means no active shards, so just return SERVICE_UNAVAILABLE
return RestStatus.SERVICE_UNAVAILABLE;
}
RestStatus status = shardFailures[0].status();
if (shardFailures.length > 1) {
for (int i = 1; i < shardFailures.length; i++) {
if (shardFailures[i].status().getStatus() >= 500) {
status = shardFailures[i].status();
}
}
}
return status;
}
public String phaseName() {
return phaseName;
}

View File

@ -41,16 +41,21 @@ public class ShardSearchFailure implements ShardOperationFailedException {
public static final ShardSearchFailure[] EMPTY_ARRAY = new ShardSearchFailure[0];
private SearchShardTarget shardTarget;
private String reason;
private RestStatus status;
private transient Throwable failure;
private ShardSearchFailure() {
}
@Nullable
public Throwable failure() {
return failure;
}
public ShardSearchFailure(Throwable t) {
this.failure = t;
Throwable actual = ExceptionsHelper.unwrapCause(t);
if (actual != null && actual instanceof SearchException) {
this.shardTarget = ((SearchException) actual).shard();

View File

@ -20,7 +20,10 @@
package org.elasticsearch.action.search.type;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
@ -144,7 +147,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardIndex, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -20,7 +20,10 @@
package org.elasticsearch.action.search.type;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
@ -153,7 +156,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardIndex, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
@ -246,7 +249,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardIndex, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -20,7 +20,10 @@
package org.elasticsearch.action.search.type;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
@ -155,7 +158,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(shardIndex, t);
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();

View File

@ -20,7 +20,9 @@
package org.elasticsearch.action.search.type;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
@ -35,7 +37,9 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
@ -90,7 +94,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
private final AtomicInteger totalOps = new AtomicInteger();
protected final AtomicArray<FirstResult> firstResults;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final AtomicArray<ShardSearchFailure> shardFailures;
protected volatile ScoreDoc[] sortedShardList;
protected final long startTime = System.currentTimeMillis();
@ -123,6 +127,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
firstResults = new AtomicArray<FirstResult>(shardsIts.size());
shardFailures = new AtomicArray<ShardSearchFailure>(shardsIts.size());
}
public void start() {
@ -142,7 +147,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
} else {
// really, no shards active in this group
onFirstPhaseResult(shardIndex, null, shardIt, null);
onFirstPhaseResult(shardIndex, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}
// we have local operations, perform them now
@ -200,11 +205,11 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(shardIndex, null, shardIt, null);
onFirstPhaseResult(shardIndex, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
onFirstPhaseResult(shardIndex, shard, shardIt, null);
onFirstPhaseResult(shardIndex, shard, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime), new SearchServiceListener<FirstResult>() {
@ -242,6 +247,10 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, final ShardIterator shardIt, Throwable t) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
addShardFailure(shardIndex, t);
if (totalOps.incrementAndGet() == expectedTotalOps) {
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
@ -253,16 +262,9 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
}
}
// no more shards, add a failure
if (t == null) {
// no active shards
addShardFailure(shardIndex, new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()), RestStatus.SERVICE_UNAVAILABLE));
} else {
addShardFailure(shardIndex, new ShardSearchFailure(t));
}
if (successulOps.get() == 0) {
// no successful ops, raise an exception
listener.onFailure(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures()));
listener.onFailure(new SearchPhaseExecutionException(firstPhaseName(), "all shards failed", buildShardFailures()));
} else {
try {
innerMoveToSecondPhase();
@ -296,12 +298,6 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
}
}
if (t == null) {
// no active shards
addShardFailure(shardIndex, new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()), RestStatus.SERVICE_UNAVAILABLE));
} else {
addShardFailure(shardIndex, new ShardSearchFailure(t));
}
}
}
}
@ -325,13 +321,34 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
return failures;
}
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(final int shardIndex, ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new AtomicArray<ShardSearchFailure>(shardsIts.size());
protected final void addShardFailure(final int shardIndex, Throwable t) {
ShardSearchFailure failure = shardFailures.get(shardIndex);
if (failure == null) {
shardFailures.set(shardIndex, new ShardSearchFailure(t));
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (isOverrideException(t)) {
shardFailures.set(shardIndex, new ShardSearchFailure(t));
}
shardFailures.set(shardIndex, failure);
}
}
protected boolean isOverrideException(Throwable t) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
if (actual instanceof IllegalIndexShardStateException) {
return false;
}
if (actual instanceof IndexMissingException) {
return false;
}
if (actual instanceof IndexShardMissingException) {
return false;
}
if (actual instanceof NoShardAvailableActionException) {
return false;
}
return false;
}
/**
@ -359,6 +376,8 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) {
firstResults.set(shardIndex, result);
// clean a previous error on this shard group
shardFailures.set(shardIndex, null);
}
final void innerMoveToSecondPhase() throws Exception {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.support.broadcast;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
@ -154,8 +155,6 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
private final AtomicInteger counterOps = new AtomicInteger();
private final AtomicInteger indexCounter = new AtomicInteger();
private final AtomicReferenceArray shardsResponses;
AsyncBroadcastAction(Request request, ActionListener<Response> listener) {
@ -190,18 +189,20 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
request.beforeStart();
// count the local operations, and perform the non local ones
int localOperations = 0;
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
} else {
// do the remote operation here, the localAsync flag is not relevant
performOperation(shardIt, true);
performOperation(shardIt, shardIndex, true);
}
} else {
// really, no shards active in this group
onOperation(null, shardIt, null);
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
}
}
// we have local operations, perform them now
@ -211,11 +212,13 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt, false);
performOperation(shardIt, shardIndex, false);
}
}
}
@ -226,11 +229,13 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
if (localAsync) {
request.beforeLocalFork();
}
shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt, localAsync);
performOperation(shardIt, shardIndex, localAsync);
}
}
}
@ -238,14 +243,14 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
}
void performOperation(final ShardIterator shardIt, boolean localAsync) {
performOperation(shardIt, shardIt.nextOrNull(), localAsync);
void performOperation(final ShardIterator shardIt, int shardIndex, boolean localAsync) {
performOperation(shardIt, shardIt.nextOrNull(), shardIndex, localAsync);
}
void performOperation(final ShardIterator shardIt, final ShardRouting shard, boolean localAsync) {
void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex, boolean localAsync) {
if (shard == null) {
// no more active shards... (we should not really get here, just safety)
onOperation(null, shardIt, null);
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
final ShardRequest shardRequest = newShardRequest(shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {
@ -254,24 +259,24 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
@Override
public void run() {
try {
onOperation(shard, shardOperation(shardRequest));
onOperation(shard, shardIndex, shardOperation(shardRequest));
} catch (Exception e) {
onOperation(shard, shardIt, e);
onOperation(shard, shardIt, shardIndex, e);
}
}
});
} else {
try {
onOperation(shard, shardOperation(shardRequest));
onOperation(shard, shardIndex, shardOperation(shardRequest));
} catch (Throwable e) {
onOperation(shard, shardIt, e);
onOperation(shard, shardIt, shardIndex, e);
}
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
// no node connected, act as failure
onOperation(shard, shardIt, null);
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
@Override
@ -286,12 +291,12 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
@Override
public void handleResponse(ShardResponse response) {
onOperation(shard, response);
onOperation(shard, shardIndex, response);
}
@Override
public void handleException(TransportException e) {
onOperation(shard, shardIt, e);
onOperation(shard, shardIt, shardIndex, e);
}
});
}
@ -300,15 +305,19 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
@SuppressWarnings({"unchecked"})
void onOperation(ShardRouting shard, ShardResponse response) {
shardsResponses.set(indexCounter.getAndIncrement(), response);
void onOperation(ShardRouting shard, int shardIndex, ShardResponse response) {
shardsResponses.set(shardIndex, response);
if (expectedOps == counterOps.incrementAndGet()) {
finishHim();
}
}
@SuppressWarnings({"unchecked"})
void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, Throwable t) {
void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Throwable t) {
// we set the shard failure always, even if its the first in the replication group, and the next one
// will work (it will just override it...)
setFailure(shardIt, shardIndex, t);
ShardRouting nextShard = shardIt.nextOrNull();
if (nextShard != null) {
if (t != null) {
@ -327,7 +336,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// or we possibly threaded if we got from a local threaded one,
// in which case, the next shard in the partition will not be local one
// so there is no meaning to this flag
performOperation(shardIt, nextShard, true);
performOperation(shardIt, nextShard, shardIndex, true);
} else {
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
@ -341,24 +350,6 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
}
}
// no more shards in this group
int index = indexCounter.getAndIncrement();
if (accumulateExceptions()) {
if (t == null) {
if (!ignoreNonActiveExceptions()) {
t = new BroadcastShardOperationFailedException(shardIt.shardId(), "No active shard(s)");
}
} else {
if (ignoreException(t)) {
t = null;
} else {
if (!(t instanceof BroadcastShardOperationFailedException)) {
t = new BroadcastShardOperationFailedException(shardIt.shardId(), t);
}
}
}
shardsResponses.set(index, t);
}
if (expectedOps == counterOps.incrementAndGet()) {
finishHim();
}
@ -368,6 +359,56 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
void finishHim() {
listener.onResponse(newResponse(request, shardsResponses, clusterState));
}
void setFailure(ShardIterator shardIt, int shardIndex, Throwable t) {
if (!accumulateExceptions()) {
return;
}
if (ignoreNonActiveExceptions() && t instanceof NoShardAvailableActionException) {
return;
}
if (ignoreException(t)) {
return;
}
if (!(t instanceof BroadcastShardOperationFailedException)) {
t = new BroadcastShardOperationFailedException(shardIt.shardId(), t);
}
Object response = shardsResponses.get(shardIndex);
if (response == null) {
// just override it and return
shardsResponses.set(shardIndex, t);
}
if (!(response instanceof Throwable)) {
// we should never really get here...
return;
}
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (isOverrideException(t)) {
shardsResponses.set(shardIndex, t);
}
}
}
protected boolean isOverrideException(Throwable t) {
Throwable actual = ExceptionsHelper.unwrapCause(t);
if (actual instanceof IllegalIndexShardStateException) {
return false;
}
if (actual instanceof IndexMissingException) {
return false;
}
if (actual instanceof IndexShardMissingException) {
return false;
}
if (actual instanceof NoShardAvailableActionException) {
return false;
}
return false;
}
class TransportHandler extends BaseTransportRequestHandler<Request> {

View File

@ -52,6 +52,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final ImmutableList<ShardRouting> shards;
final ImmutableList<ShardRouting> activeShards;
final ImmutableList<ShardRouting> assignedShards;
final ImmutableList<ShardRouting> initializingShards;
final AtomicInteger counter;
@ -67,6 +68,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
ImmutableList.Builder<ShardRouting> replicas = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> activeShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> assignedShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> initializingShards = ImmutableList.builder();
for (ShardRouting shard : shards) {
if (shard.primary()) {
@ -77,6 +79,9 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
if (shard.active()) {
activeShards.add(shard);
}
if (shard.initializing()) {
initializingShards.add(shard);
}
if (shard.assignedToNode()) {
assignedShards.add(shard);
}
@ -91,6 +96,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
this.replicas = replicas.build();
this.activeShards = activeShards.build();
this.assignedShards = assignedShards.build();
this.initializingShards = initializingShards.build();
}
/**
@ -243,7 +249,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
public ShardIterator shardsRandomIt() {
return new PlainShardIterator(shardId, shards, counter.getAndIncrement());
return new PlainShardIterator(shardId, shards, pickIndex());
}
public ShardIterator shardsIt() {
@ -255,7 +261,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
public ShardIterator activeShardsRandomIt() {
return new PlainShardIterator(shardId, activeShards, counter.getAndIncrement());
return new PlainShardIterator(shardId, activeShards, pickIndex());
}
public ShardIterator activeShardsIt() {
@ -266,8 +272,30 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, activeShards, index);
}
/**
* Returns an iterator over active and initializing shards. Making sure though that
* its random within the active shards, and initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsRandomIt() {
return activeInitializingShardsIt(pickIndex());
}
/**
* Returns an iterator over active and initializing shards. Making sure though that
* its random within the active shards, and initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsIt(int index) {
if (initializingShards.isEmpty()) {
return new PlainShardIterator(shardId, activeShards, index);
}
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + initializingShards.size());
addToListFromIndex(activeShards, ordered, index);
ordered.addAll(initializingShards);
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator assignedShardsRandomIt() {
return new PlainShardIterator(shardId, assignedShards, counter.getAndIncrement());
return new PlainShardIterator(shardId, assignedShards, pickIndex());
}
public ShardIterator assignedShardsIt() {
@ -285,18 +313,18 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, primaryAsList);
}
public ShardIterator primaryActiveShardIt() {
if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active()) {
public ShardIterator primaryActiveInitializingShardIt() {
if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active() && !primaryAsList.get(0).initializing()) {
List<ShardRouting> primaryList = ImmutableList.of();
return new PlainShardIterator(shardId, primaryList);
}
return primaryShardIt();
}
public ShardIterator primaryFirstActiveShardsIt() {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size());
public ShardIterator primaryFirstActiveInitializingShardsIt() {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + initializingShards.size());
// fill it in a randomized fashion
int index = Math.abs(counter.getAndIncrement());
int index = Math.abs(pickIndex());
for (int i = 0; i < activeShards.size(); i++) {
int loc = (index + i) % activeShards.size();
ShardRouting shardRouting = activeShards.get(loc);
@ -307,21 +335,24 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
ordered.set(0, shardRouting);
}
}
// no need to worry about primary first here..., its temporal
if (!initializingShards.isEmpty()) {
ordered.addAll(initializingShards);
}
return new PlainShardIterator(shardId, ordered);
}
/**
* Prefers execution on the provided node if applicable.
*/
public ShardIterator preferNodeShardsIt(String nodeId) {
return preferNodeShardsIt(nodeId, shards);
}
public ShardIterator onlyNodeActiveShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(shards.size());
public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + initializingShards.size());
// fill it in a randomized fashion
for (int i = 0; i < shards.size(); i++) {
ShardRouting shardRouting = shards.get(i);
for (int i = 0; i < activeShards.size(); i++) {
ShardRouting shardRouting = activeShards.get(i);
if (nodeId.equals(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
}
for (int i = 0; i < initializingShards.size(); i++) {
ShardRouting shardRouting = initializingShards.get(i);
if (nodeId.equals(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
@ -329,27 +360,13 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, ordered);
}
/**
* Prefers execution on the provided node if applicable.
*/
public ShardIterator preferNodeActiveShardsIt(String nodeId) {
return preferNodeShardsIt(nodeId, activeShards);
}
/**
* Prefers execution on the provided node if applicable.
*/
public ShardIterator preferNodeAssignedShardsIt(String nodeId) {
return preferNodeShardsIt(nodeId, assignedShards);
}
private ShardIterator preferNodeShardsIt(String nodeId, ImmutableList<ShardRouting> shards) {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(shards.size());
public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + initializingShards.size());
// fill it in a randomized fashion
int index = Math.abs(counter.getAndIncrement());
for (int i = 0; i < shards.size(); i++) {
int loc = (index + i) % shards.size();
ShardRouting shardRouting = shards.get(loc);
int index = pickIndex();
for (int i = 0; i < activeShards.size(); i++) {
int loc = (index + i) % activeShards.size();
ShardRouting shardRouting = activeShards.get(loc);
ordered.add(shardRouting);
if (nodeId.equals(shardRouting.currentNodeId())) {
// switch, its the matching node id
@ -357,6 +374,9 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
ordered.set(0, shardRouting);
}
}
if (!initializingShards.isEmpty()) {
ordered.addAll(initializingShards);
}
return new PlainShardIterator(shardId, ordered);
}
@ -393,20 +413,16 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
private volatile Map<AttributesKey, AttributesRoutings> activeShardsByAttributes = ImmutableMap.of();
private volatile Map<AttributesKey, AttributesRoutings> initializingShardsByAttributes = ImmutableMap.of();
private final Object shardsByAttributeMutex = new Object();
public ShardIterator preferAttributesActiveShardsIt(String[] attributes, DiscoveryNodes nodes) {
return preferAttributesActiveShardsIt(attributes, nodes, counter.incrementAndGet());
}
public ShardIterator preferAttributesActiveShardsIt(String[] attributes, DiscoveryNodes nodes, int index) {
AttributesKey key = new AttributesKey(attributes);
private AttributesRoutings getActiveAttribute(AttributesKey key, DiscoveryNodes nodes) {
AttributesRoutings shardRoutings = activeShardsByAttributes.get(key);
if (shardRoutings == null) {
synchronized (shardsByAttributeMutex) {
ArrayList<ShardRouting> from = new ArrayList<ShardRouting>(activeShards);
ArrayList<ShardRouting> to = new ArrayList<ShardRouting>();
for (String attribute : attributes) {
for (String attribute : key.attributes) {
String localAttributeValue = nodes.localNode().attributes().get(attribute);
if (localAttributeValue == null) {
continue;
@ -424,21 +440,53 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
activeShardsByAttributes = MapBuilder.newMapBuilder(activeShardsByAttributes).put(key, shardRoutings).immutableMap();
}
}
// we now randomize, once between the ones that have the same attributes, and once for the ones that don't
// we don't want to mix between the two!
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(shardRoutings.totalSize);
index = Math.abs(index);
for (int i = 0; i < shardRoutings.withSameAttribute.size(); i++) {
int loc = (index + i) % shardRoutings.withSameAttribute.size();
ShardRouting shardRouting = shardRoutings.withSameAttribute.get(loc);
ordered.add(shardRouting);
}
for (int i = 0; i < shardRoutings.withoutSameAttribute.size(); i++) {
int loc = (index + i) % shardRoutings.withoutSameAttribute.size();
ShardRouting shardRouting = shardRoutings.withoutSameAttribute.get(loc);
ordered.add(shardRouting);
return shardRoutings;
}
private AttributesRoutings getInitializingAttribute(AttributesKey key, DiscoveryNodes nodes) {
AttributesRoutings shardRoutings = initializingShardsByAttributes.get(key);
if (shardRoutings == null) {
synchronized (shardsByAttributeMutex) {
ArrayList<ShardRouting> from = new ArrayList<ShardRouting>(initializingShards);
ArrayList<ShardRouting> to = new ArrayList<ShardRouting>();
for (String attribute : key.attributes) {
String localAttributeValue = nodes.localNode().attributes().get(attribute);
if (localAttributeValue == null) {
continue;
}
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
ShardRouting fromShard = iterator.next();
if (localAttributeValue.equals(nodes.get(fromShard.currentNodeId()).attributes().get(attribute))) {
iterator.remove();
to.add(fromShard);
}
}
}
shardRoutings = new AttributesRoutings(ImmutableList.copyOf(to), ImmutableList.copyOf(from));
initializingShardsByAttributes = MapBuilder.newMapBuilder(initializingShardsByAttributes).put(key, shardRoutings).immutableMap();
}
}
return shardRoutings;
}
public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes) {
return preferAttributesActiveInitializingShardsIt(attributes, nodes, pickIndex());
}
public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes, int index) {
AttributesKey key = new AttributesKey(attributes);
AttributesRoutings activeRoutings = getActiveAttribute(key, nodes);
AttributesRoutings initializingRoutings = getInitializingAttribute(key, nodes);
// we now randomize, once between the ones that have the same attributes, and once for the ones that don't
// we don't want to mix between the two!
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeRoutings.totalSize + initializingRoutings.totalSize);
index = Math.abs(index);
addToListFromIndex(activeRoutings.withSameAttribute, ordered, index);
addToListFromIndex(activeRoutings.withoutSameAttribute, ordered, index);
addToListFromIndex(initializingRoutings.withSameAttribute, ordered, index);
addToListFromIndex(initializingRoutings.withoutSameAttribute, ordered, index);
return new PlainShardIterator(shardId, ordered);
}
@ -462,6 +510,23 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return shards;
}
/**
* Adds from list to list, starting from the given index (wrapping around if needed).
*/
@SuppressWarnings("unchecked")
private void addToListFromIndex(List from, List to, int index) {
index = Math.abs(index);
for (int i = 0; i < from.size(); i++) {
int loc = (index + i) % from.size();
to.add(from.get(loc));
}
}
// TODO: we can move to random based on ThreadLocalRandom, or make it pluggable
private int pickIndex() {
return Math.abs(counter.incrementAndGet());
}
public static class Builder {
private ShardId shardId;

View File

@ -41,7 +41,6 @@ import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@ -187,9 +186,9 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
if (preference == null) {
String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes();
if (awarenessAttributes.length == 0) {
return indexShard.activeShardsRandomIt();
return indexShard.activeInitializingShardsRandomIt();
} else {
return indexShard.preferAttributesActiveShardsIt(awarenessAttributes, nodes);
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
}
}
if (preference.charAt(0) == '_') {
@ -217,9 +216,9 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
if (index == -1 || index == preference.length() - 1) {
String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes();
if (awarenessAttributes.length == 0) {
return indexShard.activeShardsRandomIt();
return indexShard.activeInitializingShardsRandomIt();
} else {
return indexShard.preferAttributesActiveShardsIt(awarenessAttributes, nodes);
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
}
} else {
// update the preference and continue
@ -227,30 +226,30 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
}
}
if (preference.startsWith("_prefer_node:")) {
return indexShard.preferNodeActiveShardsIt(preference.substring("_prefer_node:".length()));
return indexShard.preferNodeActiveInitializingShardsIt(preference.substring("_prefer_node:".length()));
}
if ("_local".equals(preference)) {
return indexShard.preferNodeActiveShardsIt(localNodeId);
return indexShard.preferNodeActiveInitializingShardsIt(localNodeId);
}
if ("_primary".equals(preference)) {
return indexShard.primaryActiveShardIt();
return indexShard.primaryActiveInitializingShardIt();
}
if ("_primary_first".equals(preference) || "_primaryFirst".equals(preference)) {
return indexShard.primaryFirstActiveShardsIt();
return indexShard.primaryFirstActiveInitializingShardsIt();
}
if ("_only_local".equals(preference) || "_onlyLocal".equals(preference)) {
return indexShard.onlyNodeActiveShardsIt(localNodeId);
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
}
if (preference.startsWith("_only_node:")) {
return indexShard.onlyNodeActiveShardsIt(preference.substring("_only_node:".length()));
return indexShard.onlyNodeActiveInitializingShardsIt(preference.substring("_only_node:".length()));
}
}
// if not, then use it as the index
String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes();
if (awarenessAttributes.length == 0) {
return indexShard.activeShardsIt(DjbHashFunction.DJB_HASH(preference));
return indexShard.activeInitializingShardsIt(DjbHashFunction.DJB_HASH(preference));
} else {
return indexShard.preferAttributesActiveShardsIt(awarenessAttributes, nodes, DjbHashFunction.DJB_HASH(preference));
return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, DjbHashFunction.DJB_HASH(preference));
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.test.integration.indices.settings;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.count.CountResponse;
@ -39,22 +38,7 @@ import static org.hamcrest.Matchers.equalTo;
public class UpdateNumberOfReplicasTests extends AbstractSharedClusterTest {
/*
* Comment from Boaz on the dev@ list:
*
* A short update on the failing
* UpdateNumberOfReplicasTest.simpleUpdateNumberOfReplicasTests - Shay and I
* pinned down the source of the problem - it's caused by making searches
* based on dated knowledge of the cluster state and calling shards that
* have been relocating away in the mean time.
*
* I'll be working on a a fix (when searching on a shard that is in the
* process of relocating, fail over to the relocation target if the search
* to the relocation source failed), but will it take a couple of days to
* complete.
*/
@Test
@AwaitsFix(bugUrl = "Boaz is on it ;)")
public void simpleUpdateNumberOfReplicasTests() throws Exception {
logger.info("Creating index test");
prepareCreate("test", 2).execute().actionGet();

View File

@ -109,7 +109,7 @@ public class SimpleQueryTests extends AbstractSharedClusterTest {
try {
client().prepareSearch().setQuery(QueryBuilders.matchQuery("field1", "quick brown").type(MatchQueryBuilder.Type.PHRASE).slop(0)).get();
} catch (SearchPhaseExecutionException e) {
assertTrue(e.getMessage().endsWith("IllegalStateException[field \"field1\" was indexed without position data; cannot run PhraseQuery (term=quick)]; }"));
assertTrue("wrong exception message " + e.getMessage(), e.getMessage().endsWith("IllegalStateException[field \"field1\" was indexed without position data; cannot run PhraseQuery (term=quick)]; }"));
}
}

View File

@ -282,7 +282,7 @@ public class RoutingIteratorTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
// after all are started, check routing iteration
ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveShardsIt(new String[]{"rack_id"}, clusterState.nodes());
ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes());
ShardRouting shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node1"));
@ -290,7 +290,7 @@ public class RoutingIteratorTests {
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node2"));
shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveShardsIt(new String[]{"rack_id"}, clusterState.nodes());
shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes());
shardRouting = shardIterator.nextOrNull();
assertThat(shardRouting, notNullValue());
assertThat(shardRouting.currentNodeId(), equalTo("node1"));