diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index cb237087886..ced572ecd15 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -39,9 +39,11 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -198,7 +200,7 @@ public class TransportClientNodesService extends AbstractComponent { ImmutableList nodes = this.nodes; ensureNodesAreAvailable(nodes); int index = getNodeNumber(); - RetryListener retryListener = new RetryListener<>(callback, listener, nodes, index); + RetryListener retryListener = new RetryListener<>(callback, listener, nodes, index, threadPool, logger); DiscoveryNode node = nodes.get((index) % nodes.size()); try { callback.doWithNode(node, retryListener); @@ -212,15 +214,20 @@ public class TransportClientNodesService extends AbstractComponent { private final NodeListenerCallback callback; private final ActionListener listener; private final ImmutableList nodes; + private final ESLogger logger; private final int index; + private ThreadPool threadPool; private volatile int i; - public RetryListener(NodeListenerCallback callback, ActionListener listener, ImmutableList nodes, int index) { + public RetryListener(NodeListenerCallback callback, ActionListener listener, ImmutableList nodes, + int index, ThreadPool threadPool, ESLogger logger) { this.callback = callback; this.listener = listener; this.nodes = nodes; this.index = index; + this.threadPool = threadPool; + this.logger = logger; } @Override @@ -233,19 +240,38 @@ public class TransportClientNodesService extends AbstractComponent { if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) { int i = ++this.i; if (i >= nodes.size()) { - listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e)); + runFailureInListenerThreadPool(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e)); } else { try { callback.doWithNode(nodes.get((index + i) % nodes.size()), this); - } catch(Throwable t) { - //this exception can't come from the TransportService as it doesn't throw exceptions at all - listener.onFailure(t); + } catch(final Throwable t) { + // this exception can't come from the TransportService as it doesn't throw exceptions at all + runFailureInListenerThreadPool(t); } } } else { - listener.onFailure(e); + runFailureInListenerThreadPool(e); } } + + // need to ensure to not block the netty I/O thread, in case of retry due to the node sampling + private void runFailureInListenerThreadPool(final Throwable t) { + threadPool.executor(ThreadPool.Names.LISTENER).execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + listener.onFailure(t); + } + + @Override + public void onFailure(Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("Could not execute failure listener: [{}]", t, t.getMessage()); + } else { + logger.error("Could not execute failure listener: [{}]", t.getMessage()); + } + } + }); + } } public void close() {