handle cases where the node does not exists in the disco nodes

This commit is contained in:
kimchy 2010-06-22 10:59:24 +03:00
parent 7c931f34fa
commit 622d4041b8
3 changed files with 37 additions and 25 deletions

View File

@ -183,6 +183,9 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
onFirstPhaseResult(shard, shardIt, null); onFirstPhaseResult(shard, shardIt, null);
} else { } else {
DiscoveryNode node = nodes.get(shard.currentNodeId()); DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
onFirstPhaseResult(shard, shardIt, null);
} else {
sendExecuteFirstPhase(node, internalSearchRequest(shard, request), new SearchServiceListener<FirstResult>() { sendExecuteFirstPhase(node, internalSearchRequest(shard, request), new SearchServiceListener<FirstResult>() {
@Override public void onResult(FirstResult result) { @Override public void onResult(FirstResult result) {
onFirstPhaseResult(shard, result, shardIt); onFirstPhaseResult(shard, result, shardIt);
@ -194,6 +197,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}); });
} }
} }
}
private void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardsIterator shardIt) { private void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardsIterator shardIt) {
processFirstPhaseResult(shard, result); processFirstPhaseResult(shard, result);

View File

@ -221,6 +221,10 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
} }
} else { } else {
DiscoveryNode node = nodes.get(shard.currentNodeId()); DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
// no node connected, act as failure
onOperation(shard, shardIt, null, false);
} else {
transportService.sendRequest(node, transportShardAction(), shardRequest, new BaseTransportResponseHandler<ShardResponse>() { transportService.sendRequest(node, transportShardAction(), shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
@Override public ShardResponse newInstance() { @Override public ShardResponse newInstance() {
return newShardResponse(); return newShardResponse();
@ -242,6 +246,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
} }
} }
} }
}
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) { private void onOperation(ShardRouting shard, ShardResponse response, boolean alreadyThreaded) {

View File

@ -246,7 +246,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
boolean foundPrimary = false; boolean foundPrimary = false;
for (final ShardRouting shard : shards) { for (final ShardRouting shard : shards) {
if (shard.primary()) { if (shard.primary()) {
if (!shard.active()) { if (!shard.active() || !nodes.nodeExists(shard.currentNodeId())) {
retryPrimary(fromClusterEvent, shard); retryPrimary(fromClusterEvent, shard);
return false; return false;
} }
@ -403,7 +403,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// we index on a backup that is initializing as well since we might not have got the event // we index on a backup that is initializing as well since we might not have got the event
// yet that it was started. We will get an exception IllegalShardState exception if its not started // yet that it was started. We will get an exception IllegalShardState exception if its not started
// and that's fine, we will ignore it // and that's fine, we will ignore it
if (shard.unassigned()) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
if (shard.unassigned() || !nodes.nodeExists(shard.currentNodeId())) {
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
if (alreadyThreaded || !request.listenerThreaded()) { if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response); listener.onResponse(response);