Search / Broadcast concurrency bug can result in response corruption / errors, closes #1152.

This commit is contained in:
Shay Banon 2011-07-24 15:40:01 +03:00
parent 5e78f14f04
commit b31f68a0eb
6 changed files with 73 additions and 14 deletions

View File

@ -94,6 +94,13 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
return shardIt.nextAssignedOrNull();
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/
@Override protected ShardRouting firstShardOrNull(ShardIterator shardIt) {
return shardIt.firstAssignedOrNull();
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/

View File

@ -103,6 +103,13 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
return shardIt.nextAssignedOrNull();
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/
@Override protected ShardRouting firstShardOrNull(ShardIterator shardIt) {
return shardIt.firstAssignedOrNull();
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/

View File

@ -133,13 +133,13 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
// count the local operations, and perform the non local ones
int localOperations = 0;
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.nextActiveOrNull();
final ShardRouting shard = shardIt.firstActiveOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
} else {
// do the remote operation here, the localAsync flag is not relevant
performFirstPhase(shardIt.reset());
performFirstPhase(shardIt);
}
} else {
// really, no shards active in this group
@ -153,10 +153,10 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
final ShardRouting shard = shardIt.firstActiveOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performFirstPhase(shardIt.reset());
performFirstPhase(shardIt);
}
}
}
@ -168,17 +168,17 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
request.beforeLocalFork();
}
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
final ShardRouting shard = shardIt.firstActiveOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
performFirstPhase(shardIt.reset());
performFirstPhase(shardIt);
}
});
} else {
performFirstPhase(shardIt.reset());
performFirstPhase(shardIt);
}
}
}

View File

@ -99,12 +99,17 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
* Allows to override how shard routing is iterated over. Default implementation uses
* {@link org.elasticsearch.cluster.routing.ShardIterator#nextActiveOrNull()}.
*
* <p>Note, if overriding this method, make sure to also override {@link #hasNextShard(org.elasticsearch.cluster.routing.ShardIterator)}.
* <p>Note, if overriding this method, make sure to also override {@link #hasNextShard(org.elasticsearch.cluster.routing.ShardIterator)},
* and {@link #firstShardOrNull(org.elasticsearch.cluster.routing.ShardIterator)}.
*/
protected ShardRouting nextShardOrNull(ShardIterator shardIt) {
return shardIt.nextActiveOrNull();
}
protected ShardRouting firstShardOrNull(ShardIterator shardIt) {
return shardIt.firstActiveOrNull();
}
/**
* Allows to override how shard routing is iterated over. Default implementation uses
* {@link org.elasticsearch.cluster.routing.ShardIterator#hasNextActive()}.
@ -175,13 +180,13 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// count the local operations, and perform the non local ones
int localOperations = 0;
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = nextShardOrNull(shardIt);
final ShardRouting shard = firstShardOrNull(shardIt);
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
} else {
// do the remote operation here, the localAsync flag is not relevant
performOperation(shardIt.reset(), true);
performOperation(shardIt, true);
}
} else {
// really, no shards active in this group
@ -195,10 +200,10 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = nextShardOrNull(shardIt.reset());
final ShardRouting shard = firstShardOrNull(shardIt);
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt.reset(), false);
performOperation(shardIt, false);
}
}
}
@ -210,10 +215,10 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
request.beforeLocalFork();
}
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = nextShardOrNull(shardIt.reset());
final ShardRouting shard = firstShardOrNull(shardIt);
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt.reset(), localAsync);
performOperation(shardIt, localAsync);
}
}
}

View File

@ -125,6 +125,18 @@ public class PlainShardsIterator implements ShardsIterator {
return null;
}
@Override public ShardRouting firstActiveOrNull() {
int counter = 0;
int index = this.origIndex;
while (counter++ < size) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.active()) {
return shardRouting;
}
}
return null;
}
@Override public int sizeAssigned() {
int shardsAssigned = 0;
for (ShardRouting shardRouting : shards) {
@ -171,6 +183,18 @@ public class PlainShardsIterator implements ShardsIterator {
return null;
}
@Override public ShardRouting firstAssignedOrNull() {
int counter = 0;
int index = this.origIndex;
while (counter++ < size) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
return shardRouting;
}
}
return null;
}
final ShardRouting shardModulo(int counter) {
return shards.get((counter % size));
}

View File

@ -67,6 +67,14 @@ public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRo
*/
ShardRouting nextActiveOrNull();
/**
* Returns the first active shard, or <tt>null</tt>, without
* incrementing the iterator.
*
* @see ShardRouting#active()
*/
ShardRouting firstActiveOrNull();
/**
* The number of assigned shard routing instances.
*
@ -95,6 +103,14 @@ public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRo
*/
ShardRouting nextAssignedOrNull();
/**
* Returns the first assigned shard, or <tt>null</tt>, wuthout
* incrementing the iterator.
*
* @see ShardRouting#assignedToNode()
*/
ShardRouting firstAssignedOrNull();
int hashCode();
boolean equals(Object other);