diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 0def25ee10b..05300cb7408 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -49,6 +49,10 @@ pools, but the important ones include: For refresh operations, defaults to `scaling` with a `5m` keep-alive. +`listener`:: + Mainly for java client executing of action when listener threaded is set to true + size `(# of available processors)/2` max at 10. + Changing a specific thread pool can be done by setting its type and specific type parameters, for example, changing the `index` thread pool to have more threads: diff --git a/src/main/java/org/elasticsearch/action/ListenableActionFuture.java b/src/main/java/org/elasticsearch/action/ListenableActionFuture.java index 582d2ee0da4..29b5a2a8774 100644 --- a/src/main/java/org/elasticsearch/action/ListenableActionFuture.java +++ b/src/main/java/org/elasticsearch/action/ListenableActionFuture.java @@ -30,9 +30,4 @@ public interface ListenableActionFuture extends ActionFuture { * Add an action listener to be invoked when a response has received. */ void addListener(final ActionListener listener); - - /** - * Add an action listener (runnable) to be invoked when a response has received. - */ - void addListener(final Runnable listener); } diff --git a/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java b/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java index 925a0bc24c4..b2410f95827 100644 --- a/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java +++ b/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java @@ -63,7 +63,7 @@ public class TransportActionNodeProxy extends AdapterActionFuture implements ListenableActionFuture { final boolean listenerThreaded; - final ThreadPool threadPool; - volatile Object listeners; - boolean executedListeners = false; protected AbstractListenableActionFuture(boolean listenerThreaded, ThreadPool threadPool) { @@ -57,11 +55,7 @@ public abstract class AbstractListenableActionFuture extends AdapterAction internalAddListener(listener); } - public void addListener(final Runnable listener) { - internalAddListener(listener); - } - - public void internalAddListener(Object listener) { + public void internalAddListener(ActionListener listener) { boolean executeImmediate = false; synchronized (this) { if (executedListeners) { @@ -97,42 +91,36 @@ public abstract class AbstractListenableActionFuture extends AdapterAction if (listeners instanceof List) { List list = (List) listeners; for (Object listener : list) { - executeListener(listener); + executeListener((ActionListener) listener); } } else { - executeListener(listeners); + executeListener((ActionListener) listeners); } } } - private void executeListener(final Object listener) { + private void executeListener(final ActionListener listener) { if (listenerThreaded) { - if (listener instanceof Runnable) { - threadPool.generic().execute((Runnable) listener); - } else { - threadPool.generic().execute(new Runnable() { + try { + threadPool.executor(ThreadPool.Names.LISTENER).execute(new Runnable() { @Override public void run() { - ActionListener lst = (ActionListener) listener; try { - lst.onResponse(actionGet()); + listener.onResponse(actionGet()); } catch (ElasticsearchException e) { - lst.onFailure(e); + listener.onFailure(e); } } }); + } catch (EsRejectedExecutionException e) { + listener.onFailure(e); } } else { - if (listener instanceof Runnable) { - ((Runnable) listener).run(); - } else { - ActionListener lst = (ActionListener) listener; - try { - lst.onResponse(actionGet()); - } catch (ElasticsearchException e) { - lst.onFailure(e); - } + try { + listener.onResponse(actionGet()); + } catch (Throwable e) { + listener.onFailure(e); } } } -} +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/support/TransportAction.java b/src/main/java/org/elasticsearch/action/support/TransportAction.java index 9e849fd8b40..4637ba00c58 100644 --- a/src/main/java/org/elasticsearch/action/support/TransportAction.java +++ b/src/main/java/org/elasticsearch/action/support/TransportAction.java @@ -106,7 +106,7 @@ public abstract class TransportAction