diff --git a/core/src/main/java/org/elasticsearch/action/support/AbstractListenableActionFuture.java b/core/src/main/java/org/elasticsearch/action/support/AbstractListenableActionFuture.java deleted file mode 100644 index d6e06613d59..00000000000 --- a/core/src/main/java/org/elasticsearch/action/support/AbstractListenableActionFuture.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.support; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ListenableActionFuture; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.ArrayList; -import java.util.List; - -public abstract class AbstractListenableActionFuture extends AdapterActionFuture implements ListenableActionFuture { - - private static final Logger logger = Loggers.getLogger(AbstractListenableActionFuture.class); - - final ThreadPool threadPool; - volatile Object listeners; - boolean executedListeners = false; - - protected AbstractListenableActionFuture(ThreadPool threadPool) { - this.threadPool = threadPool; - } - - public ThreadPool threadPool() { - return threadPool; - } - - @Override - public void addListener(final ActionListener listener) { - internalAddListener(listener); - } - - public void internalAddListener(ActionListener listener) { - listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener, false); - boolean executeImmediate = false; - synchronized (this) { - if (executedListeners) { - executeImmediate = true; - } else { - Object listeners = this.listeners; - if (listeners == null) { - listeners = listener; - } else if (listeners instanceof List) { - ((List) this.listeners).add(listener); - } else { - Object orig = listeners; - listeners = new ArrayList<>(2); - ((List) listeners).add(orig); - ((List) listeners).add(listener); - } - this.listeners = listeners; - } - } - if (executeImmediate) { - executeListener(listener); - } - } - - @Override - protected void done() { - super.done(); - synchronized (this) { - executedListeners = true; - } - Object listeners = this.listeners; - if (listeners != null) { - if (listeners instanceof List) { - List list = (List) listeners; - for (Object listener : list) { - executeListener((ActionListener) listener); - } - } else { - executeListener((ActionListener) listeners); - } - } - } - - private void executeListener(final ActionListener listener) { - try { - // we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread. - // here we know we will never block - listener.onResponse(actionGet(0)); - } catch (Exception e) { - listener.onFailure(e); - } - } -} diff --git a/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java b/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java index c9b0cf9d82f..749bf1fea01 100644 --- a/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java +++ b/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java @@ -19,17 +19,120 @@ package org.elasticsearch.action.support; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.threadpool.ThreadPool; -public class PlainListenableActionFuture extends AbstractListenableActionFuture { +import java.util.ArrayList; +import java.util.List; - public PlainListenableActionFuture(ThreadPool threadPool) { - super(threadPool); +public class PlainListenableActionFuture extends AdapterActionFuture implements ListenableActionFuture { + + volatile Object listeners; + boolean executedListeners = false; + + private PlainListenableActionFuture() {} + + /** + * This method returns a listenable future. The listeners will be called on completion of the future. + * The listeners will be executed by the same thread that completes the future. + * + * @param the result of the future + * @return a listenable future + */ + public static PlainListenableActionFuture newListenableFuture() { + return new PlainListenableActionFuture<>(); + } + + /** + * This method returns a listenable future. The listeners will be called on completion of the future. + * The listeners will be executed on the LISTENER thread pool. + * @param threadPool the thread pool used to execute listeners + * @param the result of the future + * @return a listenable future + */ + public static PlainListenableActionFuture newDispatchingListenableFuture(ThreadPool threadPool) { + return new DispatchingListenableActionFuture<>(threadPool); } @Override - protected T convert(T response) { - return response; + public void addListener(final ActionListener listener) { + internalAddListener(listener); } + @Override + protected void done() { + super.done(); + synchronized (this) { + executedListeners = true; + } + Object listeners = this.listeners; + if (listeners != null) { + if (listeners instanceof List) { + List list = (List) listeners; + for (Object listener : list) { + executeListener((ActionListener) listener); + } + } else { + executeListener((ActionListener) listeners); + } + } + } + + @Override + protected T convert(T listenerResponse) { + return listenerResponse; + } + + private void internalAddListener(ActionListener listener) { + boolean executeImmediate = false; + synchronized (this) { + if (executedListeners) { + executeImmediate = true; + } else { + Object listeners = this.listeners; + if (listeners == null) { + listeners = listener; + } else if (listeners instanceof List) { + ((List) this.listeners).add(listener); + } else { + Object orig = listeners; + listeners = new ArrayList<>(2); + ((List) listeners).add(orig); + ((List) listeners).add(listener); + } + this.listeners = listeners; + } + } + if (executeImmediate) { + executeListener(listener); + } + } + + private void executeListener(final ActionListener listener) { + try { + // we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread. + // here we know we will never block + listener.onResponse(actionGet(0)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private static final class DispatchingListenableActionFuture extends PlainListenableActionFuture { + + private static final Logger logger = Loggers.getLogger(DispatchingListenableActionFuture.class); + private final ThreadPool threadPool; + + private DispatchingListenableActionFuture(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public void addListener(final ActionListener listener) { + super.addListener(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener, false)); + } + } } diff --git a/core/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java b/core/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java index 8169a674bed..0dbf4603450 100644 --- a/core/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java @@ -34,7 +34,12 @@ public class ListenableActionFutureTests extends ESTestCase { public void testListenerIsCallableFromNetworkThreads() throws Throwable { ThreadPool threadPool = new TestThreadPool("testListenerIsCallableFromNetworkThreads"); try { - final PlainListenableActionFuture future = new PlainListenableActionFuture<>(threadPool); + final PlainListenableActionFuture future; + if (randomBoolean()) { + future = PlainListenableActionFuture.newDispatchingListenableFuture(threadPool); + } else { + future = PlainListenableActionFuture.newListenableFuture(); + } final CountDownLatch listenerCalled = new CountDownLatch(1); final AtomicReference error = new AtomicReference<>(); final Object response = new Object(); diff --git a/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java b/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java index d5aab07f247..3eb1616348d 100644 --- a/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java @@ -91,7 +91,8 @@ public class TransportActionFilterChainTests extends ESTestCase { } } - PlainActionFuture future = new PlainActionFuture<>(); + PlainActionFuture future = PlainActionFuture.newFuture(); + transportAction.execute(new TestRequest(), future); try { assertThat(future.get(), notNullValue()); @@ -104,6 +105,7 @@ public class TransportActionFilterChainTests extends ESTestCase { for (ActionFilter actionFilter : actionFilters.filters()) { testFiltersByLastExecution.add((RequestTestFilter) actionFilter); } + testFiltersByLastExecution.sort(Comparator.comparingInt(o -> o.executionToken)); ArrayList finalTestFilters = new ArrayList<>(); diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java index 899d1b0ce4a..0247d8f343f 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java @@ -69,7 +69,7 @@ public class TransportClientRetryIT extends ESIntegTestCase { if (randomBoolean()) { clusterState = client.admin().cluster().state(clusterStateRequest).get().getState(); } else { - PlainActionFuture future = new PlainActionFuture<>(); + PlainActionFuture future = PlainActionFuture.newFuture(); client.admin().cluster().state(clusterStateRequest, future); clusterState = future.get().getState(); }