diff --git a/src/main/java/org/elasticsearch/action/support/TransportAction.java b/src/main/java/org/elasticsearch/action/support/TransportAction.java index 32dccae6a7d..199ba944e50 100644 --- a/src/main/java/org/elasticsearch/action/support/TransportAction.java +++ b/src/main/java/org/elasticsearch/action/support/TransportAction.java @@ -22,7 +22,9 @@ package org.elasticsearch.action.support; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.*; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.threadpool.ThreadPool; import static org.elasticsearch.action.support.PlainActionFuture.newFuture; @@ -50,7 +52,7 @@ public abstract class TransportAction listener) { if (request.listenerThreaded()) { - listener = new ThreadedActionListener(threadPool, listener); + listener = new ThreadedActionListener(threadPool, listener, logger); } ActionRequestValidationException validationException = request.validate(); if (validationException != null) { @@ -67,39 +69,60 @@ public abstract class TransportAction listener); - static class ThreadedActionListener implements ActionListener { + static final class ThreadedActionListener implements ActionListener { private final ThreadPool threadPool; private final ActionListener listener; - ThreadedActionListener(ThreadPool threadPool, ActionListener listener) { + private final ESLogger logger; + + ThreadedActionListener(ThreadPool threadPool, ActionListener listener, ESLogger logger) { this.threadPool = threadPool; this.listener = listener; + this.logger = logger; } @Override public void onResponse(final Response response) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - try { - listener.onResponse(response); - } catch (Throwable e) { - listener.onFailure(e); + try { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + try { + listener.onResponse(response); + } catch (Throwable e) { + listener.onFailure(e); + } } - } - }); - } - - @Override - public void onFailure(final Throwable e) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { + }); + } catch (EsRejectedExecutionException ex) { + logger.debug("Can not run threaded action, exectuion rejected [{}] running on current thread", listener); + /* we don't care if that takes long since we are shutting down. But if we not respond somebody could wait + * for the response on the listener side which could be a remote machine so make sure we push it out there.*/ + try { + listener.onResponse(response); + } catch (Throwable e) { listener.onFailure(e); } - }); + } + } + + @Override + public void onFailure(final Throwable e) { + try { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + listener.onFailure(e); + } + }); + } catch (EsRejectedExecutionException ex) { + logger.debug("Can not run threaded action, exectuion rejected for listener [{}] running on current thread", listener); + /* we don't care if that takes long since we are shutting down. But if we not respond somebody could wait + * for the response on the listener side which could be a remote machine so make sure we push it out there.*/ + listener.onFailure(e); + } } } }