when fixing #1229, we should also handle a case where the node is closing when connected from another node

This commit is contained in:
Shay Banon 2011-08-11 22:35:55 +03:00
parent a4339d6751
commit 3202af0dc1
3 changed files with 21 additions and 8 deletions

View File

@ -136,7 +136,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
failure = new NoShardAvailableActionException(shardIt.shardId(), "No shard available for [" + request + "]"); failure = new NoShardAvailableActionException(shardIt.shardId(), "No shard available for [" + request + "]");
} else { } else {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to get [{}]", failure, request); logger.debug(shardIt.shardId() + ": Failed to execute [{}]", failure, request);
} }
} }
listener.onFailure(failure); listener.onFailure(failure);

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.transport; package org.elasticsearch.client.transport;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
@ -152,8 +153,10 @@ public class TransportClientNodesService extends AbstractComponent {
DiscoveryNode node = nodes.get((index + i) % nodes.size()); DiscoveryNode node = nodes.get((index + i) % nodes.size());
try { try {
return callback.doWithNode(node); return callback.doWithNode(node);
} catch (ConnectTransportException e) { } catch (ElasticSearchException e) {
// retry in this case if (!(e.unwrapCause() instanceof ConnectTransportException)) {
throw e;
}
} }
} }
throw new NoNodeAvailableException(); throw new NoNodeAvailableException();
@ -172,8 +175,12 @@ public class TransportClientNodesService extends AbstractComponent {
RetryListener<Response> retryListener = new RetryListener<Response>(callback, listener, nodes, index); RetryListener<Response> retryListener = new RetryListener<Response>(callback, listener, nodes, index);
try { try {
callback.doWithNode(nodes.get((index) % nodes.size()), retryListener); callback.doWithNode(nodes.get((index) % nodes.size()), retryListener);
} catch (ConnectTransportException e) { } catch (ElasticSearchException e) {
retryListener.onFailure(e); if (e.unwrapCause() instanceof ConnectTransportException) {
retryListener.onFailure(e);
} else {
throw e;
}
} }
} }
@ -197,7 +204,7 @@ public class TransportClientNodesService extends AbstractComponent {
} }
@Override public void onFailure(Throwable e) { @Override public void onFailure(Throwable e) {
if (e instanceof ConnectTransportException) { if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
int i = ++this.i; int i = ++this.i;
if (i == nodes.size()) { if (i == nodes.size()) {
listener.onFailure(new NoNodeAvailableException()); listener.onFailure(new NoNodeAvailableException());

View File

@ -187,9 +187,15 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
Set<String> resolvedNodesIds = new HashSet<String>(nodesIds.length); Set<String> resolvedNodesIds = new HashSet<String>(nodesIds.length);
for (String nodeId : nodesIds) { for (String nodeId : nodesIds) {
if (nodeId.equals("_local")) { if (nodeId.equals("_local")) {
resolvedNodesIds.add(localNodeId()); String localNodeId = localNodeId();
if (localNodeId != null) {
resolvedNodesIds.add(localNodeId);
}
} else if (nodeId.equals("_master")) { } else if (nodeId.equals("_master")) {
resolvedNodesIds.add(masterNodeId()); String masterNodeId = masterNodeId();
if (masterNodeId != null) {
resolvedNodesIds.add(masterNodeId);
}
} else if (nodeExists(nodeId)) { } else if (nodeExists(nodeId)) {
resolvedNodesIds.add(nodeId); resolvedNodesIds.add(nodeId);
} else { } else {