Add non-dispatching listenable action future (#24412)

Currently the only implementation of `ListenableActionFuture` requires
dispatching listener execution to a thread pool. This commit renames
that variant to `DispatchingListenableActionFuture` and converts
`AbstractListenableActionFuture` to be a variant that does not require
dispatching. That class is now named `PlainListenableActionFuture`.
This commit is contained in:
Tim Brooks 2017-05-03 10:30:54 -05:00 committed by GitHub
parent 7311aaa2eb
commit 855b64b0ee
5 changed files with 118 additions and 114 deletions

View File

@ -1,106 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
public abstract class AbstractListenableActionFuture<T, L> extends AdapterActionFuture<T, L> implements ListenableActionFuture<T> {
private static final Logger logger = Loggers.getLogger(AbstractListenableActionFuture.class);
final ThreadPool threadPool;
volatile Object listeners;
boolean executedListeners = false;
protected AbstractListenableActionFuture(ThreadPool threadPool) {
this.threadPool = threadPool;
}
public ThreadPool threadPool() {
return threadPool;
}
@Override
public void addListener(final ActionListener<T> listener) {
internalAddListener(listener);
}
public void internalAddListener(ActionListener<T> listener) {
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener, false);
boolean executeImmediate = false;
synchronized (this) {
if (executedListeners) {
executeImmediate = true;
} else {
Object listeners = this.listeners;
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof List) {
((List) this.listeners).add(listener);
} else {
Object orig = listeners;
listeners = new ArrayList<>(2);
((List) listeners).add(orig);
((List) listeners).add(listener);
}
this.listeners = listeners;
}
}
if (executeImmediate) {
executeListener(listener);
}
}
@Override
protected void done() {
super.done();
synchronized (this) {
executedListeners = true;
}
Object listeners = this.listeners;
if (listeners != null) {
if (listeners instanceof List) {
List list = (List) listeners;
for (Object listener : list) {
executeListener((ActionListener<T>) listener);
}
} else {
executeListener((ActionListener<T>) listeners);
}
}
}
private void executeListener(final ActionListener<T> listener) {
try {
// we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread.
// here we know we will never block
listener.onResponse(actionGet(0));
} catch (Exception e) {
listener.onFailure(e);
}
}
}

View File

@ -19,17 +19,120 @@
package org.elasticsearch.action.support; package org.elasticsearch.action.support;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
public class PlainListenableActionFuture<T> extends AbstractListenableActionFuture<T, T> { import java.util.ArrayList;
import java.util.List;
public PlainListenableActionFuture(ThreadPool threadPool) { public class PlainListenableActionFuture<T> extends AdapterActionFuture<T, T> implements ListenableActionFuture<T> {
super(threadPool);
volatile Object listeners;
boolean executedListeners = false;
private PlainListenableActionFuture() {}
/**
* This method returns a listenable future. The listeners will be called on completion of the future.
* The listeners will be executed by the same thread that completes the future.
*
* @param <T> the result of the future
* @return a listenable future
*/
public static <T> PlainListenableActionFuture<T> newListenableFuture() {
return new PlainListenableActionFuture<>();
}
/**
* This method returns a listenable future. The listeners will be called on completion of the future.
* The listeners will be executed on the LISTENER thread pool.
* @param threadPool the thread pool used to execute listeners
* @param <T> the result of the future
* @return a listenable future
*/
public static <T> PlainListenableActionFuture<T> newDispatchingListenableFuture(ThreadPool threadPool) {
return new DispatchingListenableActionFuture<>(threadPool);
} }
@Override @Override
protected T convert(T response) { public void addListener(final ActionListener<T> listener) {
return response; internalAddListener(listener);
} }
@Override
protected void done() {
super.done();
synchronized (this) {
executedListeners = true;
}
Object listeners = this.listeners;
if (listeners != null) {
if (listeners instanceof List) {
List list = (List) listeners;
for (Object listener : list) {
executeListener((ActionListener<T>) listener);
}
} else {
executeListener((ActionListener<T>) listeners);
}
}
}
@Override
protected T convert(T listenerResponse) {
return listenerResponse;
}
private void internalAddListener(ActionListener<T> listener) {
boolean executeImmediate = false;
synchronized (this) {
if (executedListeners) {
executeImmediate = true;
} else {
Object listeners = this.listeners;
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof List) {
((List) this.listeners).add(listener);
} else {
Object orig = listeners;
listeners = new ArrayList<>(2);
((List) listeners).add(orig);
((List) listeners).add(listener);
}
this.listeners = listeners;
}
}
if (executeImmediate) {
executeListener(listener);
}
}
private void executeListener(final ActionListener<T> listener) {
try {
// we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread.
// here we know we will never block
listener.onResponse(actionGet(0));
} catch (Exception e) {
listener.onFailure(e);
}
}
private static final class DispatchingListenableActionFuture<T> extends PlainListenableActionFuture<T> {
private static final Logger logger = Loggers.getLogger(DispatchingListenableActionFuture.class);
private final ThreadPool threadPool;
private DispatchingListenableActionFuture(ThreadPool threadPool) {
this.threadPool = threadPool;
}
@Override
public void addListener(final ActionListener<T> listener) {
super.addListener(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener, false));
}
}
} }

View File

@ -34,7 +34,12 @@ public class ListenableActionFutureTests extends ESTestCase {
public void testListenerIsCallableFromNetworkThreads() throws Throwable { public void testListenerIsCallableFromNetworkThreads() throws Throwable {
ThreadPool threadPool = new TestThreadPool("testListenerIsCallableFromNetworkThreads"); ThreadPool threadPool = new TestThreadPool("testListenerIsCallableFromNetworkThreads");
try { try {
final PlainListenableActionFuture<Object> future = new PlainListenableActionFuture<>(threadPool); final PlainListenableActionFuture<Object> future;
if (randomBoolean()) {
future = PlainListenableActionFuture.newDispatchingListenableFuture(threadPool);
} else {
future = PlainListenableActionFuture.newListenableFuture();
}
final CountDownLatch listenerCalled = new CountDownLatch(1); final CountDownLatch listenerCalled = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<>(); final AtomicReference<Throwable> error = new AtomicReference<>();
final Object response = new Object(); final Object response = new Object();

View File

@ -91,7 +91,8 @@ public class TransportActionFilterChainTests extends ESTestCase {
} }
} }
PlainActionFuture<TestResponse> future = new PlainActionFuture<>(); PlainActionFuture<TestResponse> future = PlainActionFuture.newFuture();
transportAction.execute(new TestRequest(), future); transportAction.execute(new TestRequest(), future);
try { try {
assertThat(future.get(), notNullValue()); assertThat(future.get(), notNullValue());
@ -104,6 +105,7 @@ public class TransportActionFilterChainTests extends ESTestCase {
for (ActionFilter actionFilter : actionFilters.filters()) { for (ActionFilter actionFilter : actionFilters.filters()) {
testFiltersByLastExecution.add((RequestTestFilter) actionFilter); testFiltersByLastExecution.add((RequestTestFilter) actionFilter);
} }
testFiltersByLastExecution.sort(Comparator.comparingInt(o -> o.executionToken)); testFiltersByLastExecution.sort(Comparator.comparingInt(o -> o.executionToken));
ArrayList<RequestTestFilter> finalTestFilters = new ArrayList<>(); ArrayList<RequestTestFilter> finalTestFilters = new ArrayList<>();

View File

@ -69,7 +69,7 @@ public class TransportClientRetryIT extends ESIntegTestCase {
if (randomBoolean()) { if (randomBoolean()) {
clusterState = client.admin().cluster().state(clusterStateRequest).get().getState(); clusterState = client.admin().cluster().state(clusterStateRequest).get().getState();
} else { } else {
PlainActionFuture<ClusterStateResponse> future = new PlainActionFuture<>(); PlainActionFuture<ClusterStateResponse> future = PlainActionFuture.newFuture();
client.admin().cluster().state(clusterStateRequest, future); client.admin().cluster().state(clusterStateRequest, future);
clusterState = future.get().getState(); clusterState = future.get().getState();
} }