TransportClient: Ensure netty I/O thread is not blocked

Whenever a transport client executes a request, it uses a built-in
RetryListener which tries to execute the request on another node.

However, if a connection error occurs, the onFailure() callback of
the listener is triggered, the netty I/O thread might still be used
to whatever failure has been added.

This commit offloads the onFailure handling to the generic thread pool.
This commit is contained in:
Alexander Reelsen 2015-04-26 21:31:36 +02:00
parent fe331b57b7
commit 91e2bb193c

View File

@ -39,9 +39,11 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
@ -198,7 +200,7 @@ public class TransportClientNodesService extends AbstractComponent {
ImmutableList<DiscoveryNode> nodes = this.nodes;
ensureNodesAreAvailable(nodes);
int index = getNodeNumber();
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index);
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, threadPool, logger);
DiscoveryNode node = nodes.get((index) % nodes.size());
try {
callback.doWithNode(node, retryListener);
@ -212,15 +214,20 @@ public class TransportClientNodesService extends AbstractComponent {
private final NodeListenerCallback<Response> callback;
private final ActionListener<Response> listener;
private final ImmutableList<DiscoveryNode> nodes;
private final ESLogger logger;
private final int index;
private ThreadPool threadPool;
private volatile int i;
public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, ImmutableList<DiscoveryNode> nodes, int index) {
public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, ImmutableList<DiscoveryNode> nodes,
int index, ThreadPool threadPool, ESLogger logger) {
this.callback = callback;
this.listener = listener;
this.nodes = nodes;
this.index = index;
this.threadPool = threadPool;
this.logger = logger;
}
@Override
@ -233,19 +240,38 @@ public class TransportClientNodesService extends AbstractComponent {
if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
int i = ++this.i;
if (i >= nodes.size()) {
listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
runFailureInListenerThreadPool(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
} else {
try {
callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
} catch(Throwable t) {
//this exception can't come from the TransportService as it doesn't throw exceptions at all
listener.onFailure(t);
} catch(final Throwable t) {
// this exception can't come from the TransportService as it doesn't throw exceptions at all
runFailureInListenerThreadPool(t);
}
}
} else {
listener.onFailure(e);
runFailureInListenerThreadPool(e);
}
}
// need to ensure to not block the netty I/O thread, in case of retry due to the node sampling
private void runFailureInListenerThreadPool(final Throwable t) {
threadPool.executor(ThreadPool.Names.LISTENER).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
listener.onFailure(t);
}
@Override
public void onFailure(Throwable t) {
if (logger.isDebugEnabled()) {
logger.debug("Could not execute failure listener: [{}]", t, t.getMessage());
} else {
logger.error("Could not execute failure listener: [{}]", t.getMessage());
}
}
});
}
}
public void close() {