Search on a shard group while relocation final flip happens might fail

make sure relocation shards add their corresponding initializing shard routing when search across initializing shards

also, make shardFailures lazy again

closes 
This commit is contained in:
Shay Banon 2013-08-02 00:20:10 +02:00
parent ebda203ce6
commit 235b3a3635
3 changed files with 45 additions and 47 deletions
src/main/java/org/elasticsearch

View File

@ -94,7 +94,8 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
private final AtomicInteger totalOps = new AtomicInteger();
protected final AtomicArray<FirstResult> firstResults;
private final AtomicArray<ShardSearchFailure> shardFailures;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final Object shardFailuresMutex = new Object();
protected volatile ScoreDoc[] sortedShardList;
protected final long startTime = System.currentTimeMillis();
@ -127,7 +128,6 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
firstResults = new AtomicArray<FirstResult>(shardsIts.size());
shardFailures = new AtomicArray<ShardSearchFailure>(shardsIts.size());
}
public void start() {
@ -310,6 +310,7 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
protected final ShardSearchFailure[] buildShardFailures() {
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
if (shardFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
@ -322,6 +323,14 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
}
protected final void addShardFailure(final int shardIndex, Throwable t) {
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
if (shardFailures == null) {
synchronized (shardFailuresMutex) {
if (shardFailures == null) {
shardFailures = new AtomicArray<ShardSearchFailure>(shardsIts.size());
}
}
}
ShardSearchFailure failure = shardFailures.get(shardIndex);
if (failure == null) {
shardFailures.set(shardIndex, new ShardSearchFailure(t));
@ -376,9 +385,15 @@ public abstract class TransportSearchTypeAction extends TransportAction<SearchRe
protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) {
firstResults.set(shardIndex, result);
// clean a previous error on this shard group
// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
// so its ok concurrency wise to miss potentially the shard failures being created because of another failure
// in the #addShardFailure, because by definition, it will happen on *another* shardIndex
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
if (shardFailures != null) {
shardFailures.set(shardIndex, null);
}
}
final void innerMoveToSecondPhase() throws Exception {
moveToSecondPhase();

View File

@ -173,44 +173,18 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
return new PlainShardIterator(shardId(), asList);
}
/**
* Reads a {@link ImmutableShardRouting} instance of a shard from an {@link InputStream}
*
* @param in {@link InputStream} to read the entry from
* @return {@link ImmutableShardRouting} instances read from the given {@link InputStream}
* @throws IOException if some exception occurs during the read operations
*/
public static ImmutableShardRouting readShardRoutingEntry(StreamInput in) throws IOException {
ImmutableShardRouting entry = new ImmutableShardRouting();
entry.readFrom(in);
return entry;
}
/**
* Reads a routingentry from an inputstream with given <code>index</code> and
* <code>shardId</code>.
*
* @param in inputstream to read the entry from
* @param index shards index
* @param id id of the shard
* @return Shard routing entry read
* @throws IOException if some exception occurs during the read operations
*/
public static ImmutableShardRouting readShardRoutingEntry(StreamInput in, String index, int shardId) throws IOException {
ImmutableShardRouting entry = new ImmutableShardRouting();
entry.readFrom(in, index, shardId);
return entry;
}
/**
* Read information from an inputstream with given <code>index</code> and
* <code>shardId</code>.
*
* @param in inputstream to read the entry from
* @param index shards index
* @param id id of the shard
* @throws IOException if some exception occurs during the read operations
*/
public void readFrom(StreamInput in, String index, int shardId) throws IOException {
this.index = index;
this.shardId = shardId;

View File

@ -52,7 +52,12 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final ImmutableList<ShardRouting> shards;
final ImmutableList<ShardRouting> activeShards;
final ImmutableList<ShardRouting> assignedShards;
final ImmutableList<ShardRouting> initializingShards;
/**
* The initializing list, including ones that are initializing on a target node because of relocation.
* If we can come up with a better variable name, it would be nice...
*/
final ImmutableList<ShardRouting> allInitializingShards;
final AtomicInteger counter;
@ -68,7 +73,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
ImmutableList.Builder<ShardRouting> replicas = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> activeShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> assignedShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> initializingShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> allInitializingShards = ImmutableList.builder();
for (ShardRouting shard : shards) {
if (shard.primary()) {
@ -80,7 +85,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
activeShards.add(shard);
}
if (shard.initializing()) {
initializingShards.add(shard);
allInitializingShards.add(shard);
}
if (shard.relocating()) {
// create the target initializing shard routing on the node the shard is relocating to
allInitializingShards.add(new ImmutableShardRouting(shard.index(), shard.id(), shard.relocatingNodeId(), shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version()));
}
if (shard.assignedToNode()) {
assignedShards.add(shard);
@ -96,7 +105,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
this.replicas = replicas.build();
this.activeShards = activeShards.build();
this.assignedShards = assignedShards.build();
this.initializingShards = initializingShards.build();
this.allInitializingShards = allInitializingShards.build();
}
/**
@ -285,12 +294,12 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
* its random within the active shards, and initializing shards are the last to iterate through.
*/
public ShardIterator activeInitializingShardsIt(int index) {
if (initializingShards.isEmpty()) {
if (allInitializingShards.isEmpty()) {
return new PlainShardIterator(shardId, activeShards, index);
}
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + initializingShards.size());
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + allInitializingShards.size());
addToListFromIndex(activeShards, ordered, index);
ordered.addAll(initializingShards);
ordered.addAll(allInitializingShards);
return new PlainShardIterator(shardId, ordered);
}
@ -322,7 +331,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
public ShardIterator primaryFirstActiveInitializingShardsIt() {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + initializingShards.size());
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion
int index = Math.abs(pickIndex());
for (int i = 0; i < activeShards.size(); i++) {
@ -336,14 +345,14 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
}
// no need to worry about primary first here..., its temporal
if (!initializingShards.isEmpty()) {
ordered.addAll(initializingShards);
if (!allInitializingShards.isEmpty()) {
ordered.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + initializingShards.size());
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion
for (int i = 0; i < activeShards.size(); i++) {
ShardRouting shardRouting = activeShards.get(i);
@ -351,8 +360,8 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
ordered.add(shardRouting);
}
}
for (int i = 0; i < initializingShards.size(); i++) {
ShardRouting shardRouting = initializingShards.get(i);
for (int i = 0; i < allInitializingShards.size(); i++) {
ShardRouting shardRouting = allInitializingShards.get(i);
if (nodeId.equals(shardRouting.currentNodeId())) {
ordered.add(shardRouting);
}
@ -361,7 +370,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + initializingShards.size());
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion
int index = pickIndex();
for (int i = 0; i < activeShards.size(); i++) {
@ -374,8 +383,8 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
ordered.set(0, shardRouting);
}
}
if (!initializingShards.isEmpty()) {
ordered.addAll(initializingShards);
if (!allInitializingShards.isEmpty()) {
ordered.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}
@ -447,7 +456,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
AttributesRoutings shardRoutings = initializingShardsByAttributes.get(key);
if (shardRoutings == null) {
synchronized (shardsByAttributeMutex) {
ArrayList<ShardRouting> from = new ArrayList<ShardRouting>(initializingShards);
ArrayList<ShardRouting> from = new ArrayList<ShardRouting>(allInitializingShards);
ArrayList<ShardRouting> to = new ArrayList<ShardRouting>();
for (String attribute : key.attributes) {
String localAttributeValue = nodes.localNode().attributes().get(attribute);