TransportClient: Improve logging, fix minor issue

In order to return more information to the client, in case a TransportClient
can not connect to the cluster, this commit adds logging and also returns the
configured nodes in the NoNodeAvailableException

Also a minor bug has been fixed, which propagated exceptions wrong, so that an
invalid request was actually tried on every node, if a regular connection failure
on the first node had happened.

Closes #6376
This commit is contained in:
Alexander Reelsen 2014-06-02 13:40:44 +02:00
parent 38be1e0dde
commit d3dc158458
2 changed files with 42 additions and 23 deletions

View File

@ -27,8 +27,12 @@ import org.elasticsearch.rest.RestStatus;
*/
public class NoNodeAvailableException extends ElasticsearchException {
public NoNodeAvailableException() {
super("No node available");
public NoNodeAvailableException(String message) {
super(message);
}
public NoNodeAvailableException(String message, Throwable t) {
super(message, t);
}
@Override

View File

@ -192,40 +192,31 @@ public class TransportClientNodesService extends AbstractComponent {
public <T> T execute(NodeCallback<T> callback) throws ElasticsearchException {
ImmutableList<DiscoveryNode> nodes = this.nodes;
if (nodes.isEmpty()) {
throw new NoNodeAvailableException();
}
int index = randomNodeGenerator.incrementAndGet();
if (index < 0) {
index = 0;
randomNodeGenerator.set(0);
}
ensureNodesAreAvailable(nodes);
int index = getNodeNumber();
for (int i = 0; i < nodes.size(); i++) {
DiscoveryNode node = nodes.get((index + i) % nodes.size());
try {
return callback.doWithNode(node);
} catch (ElasticsearchException e) {
if (!(e.unwrapCause() instanceof ConnectTransportException)) {
if (e.unwrapCause() instanceof ConnectTransportException) {
logConnectTransportException((ConnectTransportException) e.unwrapCause());
} else {
throw e;
}
}
}
throw new NoNodeAvailableException();
throw new NoNodeAvailableException("None of the configured nodes were available: " + nodes);
}
public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) throws ElasticsearchException {
ImmutableList<DiscoveryNode> nodes = this.nodes;
if (nodes.isEmpty()) {
throw new NoNodeAvailableException();
}
int index = randomNodeGenerator.incrementAndGet();
if (index < 0) {
index = 0;
randomNodeGenerator.set(0);
}
ensureNodesAreAvailable(nodes);
int index = getNodeNumber();
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index);
DiscoveryNode node = nodes.get((index) % nodes.size());
try {
callback.doWithNode(nodes.get((index) % nodes.size()), retryListener);
callback.doWithNode(node, retryListener);
} catch (ElasticsearchException e) {
if (e.unwrapCause() instanceof ConnectTransportException) {
retryListener.onFailure(e);
@ -260,13 +251,13 @@ public class TransportClientNodesService extends AbstractComponent {
if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
int i = ++this.i;
if (i >= nodes.size()) {
listener.onFailure(new NoNodeAvailableException());
listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
} else {
try {
callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
} catch (Throwable e1) {
// retry the next one...
onFailure(e);
onFailure(e1);
}
}
} else {
@ -292,6 +283,30 @@ public class TransportClientNodesService extends AbstractComponent {
}
}
private int getNodeNumber() {
int index = randomNodeGenerator.incrementAndGet();
if (index < 0) {
index = 0;
randomNodeGenerator.set(0);
}
return index;
}
private void ensureNodesAreAvailable(ImmutableList<DiscoveryNode> nodes) {
if (nodes.isEmpty()) {
String message = String.format(Locale.ROOT, "None of the configured nodes are available: %s", nodes);
throw new NoNodeAvailableException(message);
}
}
private void logConnectTransportException(ConnectTransportException connectTransportException) {
if (logger.isTraceEnabled()) {
logger.trace("Could not connect to [{}] for action [{}], error [{}] [{}]", connectTransportException, connectTransportException.node(), connectTransportException.action(), connectTransportException.status().name(), connectTransportException.getMessage());
} else {
logger.debug("Could not connect to [{}] for action [{}], error [{}] [{}]", connectTransportException.node(), connectTransportException.action(), connectTransportException.status().name(), connectTransportException.getMessage());
}
}
abstract class NodeSampler {
public void sample() {
synchronized (mutex) {