Use PlainListenableActionFuture for CloseFuture (#26242)

Right now we use a custom future for the CloseFuture associated with a
channel. This is because we need special unwrapping logic to ensure that
exceptions from a future failure are a certain type (opposed to an
UncategorizedException). However, the current version is limiting
because we can only attach one listener.

This commit changes the CloseFuture to extend the
PlainListenableActionFuture. This change allows us to attach multiple
listeners.
This commit is contained in:
Tim Brooks 2017-08-18 13:38:38 -05:00 committed by GitHub
parent 6eef6c4f7a
commit 5d7a78fcdb
9 changed files with 30 additions and 47 deletions

View File

@ -45,7 +45,7 @@ public interface ActionListener<Response> {
* Creates a listener that listens for a response (or failure) and executes the
* corresponding consumer when the response (or failure) is received.
*
* @param onResponse the consumer of the response, when the listener receives one
* @param onResponse the checked consumer of the response, when the listener receives one
* @param onFailure the consumer of the failure, when the listener receives one
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the consumer when received

View File

@ -33,7 +33,7 @@ public class PlainListenableActionFuture<T> extends AdapterActionFuture<T, T> im
volatile Object listeners;
boolean executedListeners = false;
private PlainListenableActionFuture() {}
protected PlainListenableActionFuture() {}
/**
* This method returns a listenable future. The listeners will be called on completion of the future.

View File

@ -136,10 +136,6 @@ public class NioClient {
for (final NioSocketChannel socketChannel : connections) {
try {
socketChannel.closeAsync().awaitClose();
} catch (InterruptedException inner) {
logger.trace("exception while closing channel", e);
e.addSuppressed(inner);
Thread.currentThread().interrupt();
} catch (Exception inner) {
logger.trace("exception while closing channel", e);
e.addSuppressed(inner);

View File

@ -119,11 +119,7 @@ public class NioTransport extends TcpTransport<NioChannel> {
for (CloseFuture future : futures) {
try {
future.awaitClose();
IOException closeException = future.getCloseException();
if (closeException != null) {
closingExceptions = addClosingException(closingExceptions, closeException);
}
} catch (InterruptedException e) {
} catch (Exception e) {
closingExceptions = addClosingException(closingExceptions, e);
}
}

View File

@ -113,7 +113,7 @@ public abstract class AbstractNioChannel<S extends SelectableChannel & NetworkCh
closeRawChannel();
closedOnThisCall = closeFuture.channelClosed(this);
} catch (IOException e) {
closedOnThisCall = closeFuture.channelCloseThrewException(this, e);
closedOnThisCall = closeFuture.channelCloseThrewException(e);
} finally {
if (closedOnThisCall) {
selector.removeRegisteredChannel(this);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.nio.channel;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.mocksocket.PrivilegedSocketAccess;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.nio.AcceptingSelector;
@ -55,7 +56,7 @@ public class ChannelFactory {
SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
NioSocketChannel channel = new NioSocketChannel(NioChannel.CLIENT, rawChannel, selector);
channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel));
channel.getCloseFuture().setListener(closeListener);
channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel)));
scheduleChannel(channel, selector);
return channel;
}
@ -65,7 +66,7 @@ public class ChannelFactory {
SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverChannel);
NioSocketChannel channel = new NioSocketChannel(serverChannel.getProfile(), rawChannel, selector);
channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel));
channel.getCloseFuture().setListener(closeListener);
channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel)));
scheduleChannel(channel, selector);
return channel;
}

View File

@ -19,35 +19,37 @@
package org.elasticsearch.transport.nio.channel;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
public class CloseFuture extends BaseFuture<NioChannel> {
private final SetOnce<Consumer<NioChannel>> listener = new SetOnce<>();
public class CloseFuture extends PlainListenableActionFuture<NioChannel> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException("Cannot cancel close future");
}
public void awaitClose() throws InterruptedException, IOException {
public void awaitClose() throws IOException {
try {
super.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw (IOException) e.getCause();
}
}
public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, IOException {
public void awaitClose(long timeout, TimeUnit unit) throws TimeoutException, IOException {
try {
super.get(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw (IOException) e.getCause();
}
@ -76,31 +78,13 @@ public class CloseFuture extends BaseFuture<NioChannel> {
return super.isDone();
}
public void setListener(Consumer<NioChannel> listener) {
this.listener.set(listener);
}
boolean channelClosed(NioChannel channel) {
boolean set = set(channel);
if (set) {
Consumer<NioChannel> listener = this.listener.get();
if (listener != null) {
listener.accept(channel);
}
}
return set;
return set(channel);
}
boolean channelCloseThrewException(NioChannel channel, IOException ex) {
boolean set = setException(ex);
if (set) {
Consumer<NioChannel> listener = this.listener.get();
if (listener != null) {
listener.accept(channel);
}
}
return set;
boolean channelCloseThrewException(IOException ex) {
return setException(ex);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.nio.channel;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.AcceptingSelector;
import org.elasticsearch.transport.nio.AcceptorEventHandler;
@ -33,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.mockito.Mockito.mock;
@ -64,10 +66,11 @@ public class NioServerSocketChannelTests extends ESTestCase {
CountDownLatch latch = new CountDownLatch(1);
NioChannel channel = new DoNotCloseServerChannel("nio", mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector);
channel.getCloseFuture().setListener((c) -> {
Consumer<NioChannel> listener = (c) -> {
ref.set(c);
latch.countDown();
});
};
channel.getCloseFuture().addListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(channel)));
CloseFuture closeFuture = channel.getCloseFuture();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.nio.channel;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.SocketEventHandler;
import org.elasticsearch.transport.nio.SocketSelector;
@ -34,6 +35,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;
@ -67,10 +69,11 @@ public class NioSocketChannelTests extends ESTestCase {
NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, mock(SocketChannel.class), selector);
socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class));
socketChannel.getCloseFuture().setListener((c) -> {
Consumer<NioChannel> listener = (c) -> {
ref.set(c);
latch.countDown();
});
};
socketChannel.getCloseFuture().addListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(socketChannel)));
CloseFuture closeFuture = socketChannel.getCloseFuture();
assertFalse(closeFuture.isClosed());