fix a case where node might not exists on relocating, don't fail the replication action (index / delete / ...) because of it

This commit is contained in:
kimchy 2010-08-19 13:08:20 +03:00
parent 51656552a5
commit 27e92cd505
1 changed files with 61 additions and 36 deletions

View File

@ -410,23 +410,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// initialize the counter // initialize the counter
int replicaCounter = 0; int replicaCounter = 0;
if (replicationType == ReplicationType.ASYNC) { for (final ShardRouting shard : shards.reset()) {
// async replication, notify the listener // if its unassigned, nothing to do here...
if (alreadyThreaded || !request.listenerThreaded()) { if (shard.unassigned()) {
listener.onResponse(response); continue;
} else {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
}
// now, trick the counter so it won't decrease to 0
replicaCounter = -100;
} }
for (final ShardRouting shard : shards.reset()) {
// if the shard is primary and relocating, add one to the counter since we perform it on the replica as well // if the shard is primary and relocating, add one to the counter since we perform it on the replica as well
// (and we already did it on the primary)
if (shard.primary()) { if (shard.primary()) {
if (shard.relocating()) { if (shard.relocating()) {
replicaCounter++; replicaCounter++;
@ -442,8 +433,43 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} }
} }
if (replicaCounter == 0) {
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response);
} else {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
}
return;
}
if (replicationType == ReplicationType.ASYNC) {
// async replication, notify the listener
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response);
} else {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
}
// now, trick the counter so it won't decrease to 0 and notify the listeners
replicaCounter = -100;
}
AtomicInteger counter = new AtomicInteger(replicaCounter); AtomicInteger counter = new AtomicInteger(replicaCounter);
for (final ShardRouting shard : shards.reset()) { for (final ShardRouting shard : shards.reset()) {
// if its unassigned, nothing to do here...
if (shard.unassigned()) {
continue;
}
// if the shard is primary and relocating, add one to the counter since we perform it on the replica as well
// (and we already did it on the primary)
boolean doOnlyOnRelocating = false; boolean doOnlyOnRelocating = false;
if (shard.primary()) { if (shard.primary()) {
if (shard.relocating()) { if (shard.relocating()) {
@ -455,10 +481,19 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// we index on a replica that is initializing as well since we might not have got the event // we index on a replica that is initializing as well since we might not have got the event
// yet that it was started. We will get an exception IllegalShardState exception if its not started // yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it // and that's fine, we will ignore it
if (!doOnlyOnRelocating) {
performOnReplica(response, alreadyThreaded, counter, shard, shard.currentNodeId());
}
if (shard.relocating()) {
performOnReplica(response, alreadyThreaded, counter, shard, shard.relocatingNodeId());
}
}
}
private void performOnReplica(final Response response, boolean alreadyThreaded, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
// if we don't have that node, it means that it might have failed and will be created again, in // if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover // this case, we don't have to do the operation, and just let it failover
if (shard.unassigned() || !nodes.nodeExists(shard.currentNodeId())) { if (!nodes.nodeExists(nodeId)) {
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
if (alreadyThreaded || !request.listenerThreaded()) { if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response); listener.onResponse(response);
@ -469,20 +504,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} }
}); });
} }
break;
}
continue;
}
if (!doOnlyOnRelocating) {
performOnReplica(response, counter, shard, shard.currentNodeId());
}
if (shard.relocating()) {
performOnReplica(response, counter, shard, shard.relocatingNodeId());
}
} }
return;
} }
private void performOnReplica(final Response response, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
final ShardOperationRequest shardRequest = new ShardOperationRequest(shards.shardId().id(), request); final ShardOperationRequest shardRequest = new ShardOperationRequest(shards.shardId().id(), request);
if (!nodeId.equals(nodes.localNodeId())) { if (!nodeId.equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(nodeId); DiscoveryNode node = nodes.get(nodeId);