Remove Throwable usage from transport modules (#30845)

Currently nio and netty modules use the CompletableFuture class for
managing listeners. This is unfortunate as that class accepts
Throwable. This commit adds a class CompletableContext that wraps
the CompletableFuture but does not accept Throwable. This allows the
modification of netty and nio logic to no longer handle Throwable.
This commit is contained in:
Tim Brooks 2018-05-24 17:33:29 -06:00 committed by GitHub
parent 5a97423b7a
commit e8b70273c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 127 additions and 67 deletions

View File

@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.concurrent;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
/**
* A thread-safe completable context that allows listeners to be attached. This class relies on the
* {@link CompletableFuture} for the concurrency logic. However, it does not accept {@link Throwable} as
* an exceptional result. This allows attaching listeners that only handle {@link Exception}.
*
* @param <T> the result type
*/
public class CompletableContext<T> {
private final CompletableFuture<T> completableFuture = new CompletableFuture<>();
public void addListener(BiConsumer<T, ? super Exception> listener) {
BiConsumer<T, Throwable> castThrowable = (v, t) -> {
if (t == null) {
listener.accept(v, null);
} else {
assert !(t instanceof Error) : "Cannot be error";
listener.accept(v, (Exception) t);
}
};
completableFuture.whenComplete(castThrowable);
}
public boolean isDone() {
return completableFuture.isDone();
}
public boolean isCompletedExceptionally() {
return completableFuture.isCompletedExceptionally();
}
public boolean completeExceptionally(Exception ex) {
return completableFuture.completeExceptionally(ex);
}
public boolean complete(T value) {
return completableFuture.complete(value);
}
}

View File

@ -33,6 +33,8 @@ publishing {
}
dependencies {
compile "org.elasticsearch:elasticsearch-core:${version}"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testCompile "junit:junit:${versions.junit}"
testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}"

View File

@ -28,7 +28,7 @@ public abstract class BytesWriteHandler implements ReadWriteHandler {
private static final List<FlushOperation> EMPTY_LIST = Collections.emptyList();
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener) {
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener) {
assert message instanceof ByteBuffer[] : "This channel only supports messages that are of type: " + ByteBuffer[].class
+ ". Found type: " + message.getClass() + ".";
return new FlushReadyWrite(context, (ByteBuffer[]) message, listener);

View File

@ -19,11 +19,12 @@
package org.elasticsearch.nio;
import org.elasticsearch.common.concurrent.CompletableContext;
import java.io.IOException;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -37,7 +38,7 @@ public abstract class ChannelContext<S extends SelectableChannel & NetworkChanne
protected final S rawChannel;
private final Consumer<Exception> exceptionHandler;
private final CompletableFuture<Void> closeContext = new CompletableFuture<>();
private final CompletableContext<Void> closeContext = new CompletableContext<>();
private volatile SelectionKey selectionKey;
ChannelContext(S rawChannel, Consumer<Exception> exceptionHandler) {
@ -81,8 +82,8 @@ public abstract class ChannelContext<S extends SelectableChannel & NetworkChanne
*
* @param listener to be called
*/
public void addCloseListener(BiConsumer<Void, Throwable> listener) {
closeContext.whenComplete(listener);
public void addCloseListener(BiConsumer<Void, Exception> listener) {
closeContext.addListener(listener);
}
public boolean isOpen() {

View File

@ -25,13 +25,13 @@ import java.util.function.BiConsumer;
public class FlushOperation {
private final BiConsumer<Void, Throwable> listener;
private final BiConsumer<Void, Exception> listener;
private final ByteBuffer[] buffers;
private final int[] offsets;
private final int length;
private int internalIndex;
public FlushOperation(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
public FlushOperation(ByteBuffer[] buffers, BiConsumer<Void, Exception> listener) {
this.listener = listener;
this.buffers = buffers;
this.offsets = new int[buffers.length];
@ -44,7 +44,7 @@ public class FlushOperation {
length = offset;
}
public BiConsumer<Void, Throwable> getListener() {
public BiConsumer<Void, Exception> getListener() {
return listener;
}

View File

@ -27,7 +27,7 @@ public class FlushReadyWrite extends FlushOperation implements WriteOperation {
private final SocketChannelContext channelContext;
private final ByteBuffer[] buffers;
FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Exception> listener) {
super(buffers, listener);
this.channelContext = channelContext;
this.buffers = buffers;

View File

@ -53,7 +53,7 @@ public abstract class NioChannel {
*
* @param listener to be called at close
*/
public void addCloseListener(BiConsumer<Void, Throwable> listener) {
public void addCloseListener(BiConsumer<Void, Exception> listener) {
getContext().addCloseListener(listener);
}

View File

@ -60,7 +60,7 @@ public class NioSocketChannel extends NioChannel {
return remoteAddress;
}
public void addConnectListener(BiConsumer<Void, Throwable> listener) {
public void addConnectListener(BiConsumer<Void, Exception> listener) {
context.addConnectListener(listener);
}

View File

@ -38,7 +38,7 @@ public interface ReadWriteHandler {
* @param listener the listener to be called when the message is sent
* @return the write operation to be queued
*/
WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener);
WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener);
/**
* This method is called on the event loop thread. It should serialize a write operation object to bytes

View File

@ -19,6 +19,7 @@
package org.elasticsearch.nio;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.nio.utils.ExceptionsHelper;
import java.io.IOException;
@ -27,7 +28,6 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -48,7 +48,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
protected final AtomicBoolean isClosing = new AtomicBoolean(false);
private final ReadWriteHandler readWriteHandler;
private final SocketSelector selector;
private final CompletableFuture<Void> connectContext = new CompletableFuture<>();
private final CompletableContext<Void> connectContext = new CompletableContext<>();
private final LinkedList<FlushOperation> pendingFlushes = new LinkedList<>();
private boolean ioException;
private boolean peerClosed;
@ -73,8 +73,8 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
return channel;
}
public void addConnectListener(BiConsumer<Void, Throwable> listener) {
connectContext.whenComplete(listener);
public void addConnectListener(BiConsumer<Void, Exception> listener) {
connectContext.addListener(listener);
}
public boolean isConnectComplete() {
@ -121,7 +121,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
return isConnected;
}
public void sendMessage(Object message, BiConsumer<Void, Throwable> listener) {
public void sendMessage(Object message, BiConsumer<Void, Exception> listener) {
if (isClosing.get()) {
listener.accept(null, new ClosedChannelException());
return;

View File

@ -138,7 +138,7 @@ public class SocketSelector extends ESSelector {
* @param listener to be executed
* @param value to provide to listener
*/
public <V> void executeListener(BiConsumer<V, Throwable> listener, V value) {
public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
assertOnSelectorThread();
try {
listener.accept(value, null);
@ -154,7 +154,7 @@ public class SocketSelector extends ESSelector {
* @param listener to be executed
* @param exception to provide to listener
*/
public <V> void executeFailedListener(BiConsumer<V, Throwable> listener, Exception exception) {
public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Exception exception) {
assertOnSelectorThread();
try {
listener.accept(null, exception);

View File

@ -27,7 +27,7 @@ import java.util.function.BiConsumer;
*/
public interface WriteOperation {
BiConsumer<Void, Throwable> getListener();
BiConsumer<Void, Exception> getListener();
SocketChannelContext getChannel();

View File

@ -45,7 +45,7 @@ public class BytesChannelContextTests extends ESTestCase {
private BytesChannelContext context;
private InboundChannelBuffer channelBuffer;
private SocketSelector selector;
private BiConsumer<Void, Throwable> listener;
private BiConsumer<Void, Exception> listener;
private int messageLength;
@Before
@ -191,7 +191,7 @@ public class BytesChannelContextTests extends ESTestCase {
public void testMultipleWritesPartialFlushes() throws IOException {
assertFalse(context.readyForFlush());
BiConsumer<Void, Throwable> listener2 = mock(BiConsumer.class);
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
FlushReadyWrite flushOperation1 = mock(FlushReadyWrite.class);
FlushReadyWrite flushOperation2 = mock(FlushReadyWrite.class);
when(flushOperation1.getBuffersToWrite()).thenReturn(new ByteBuffer[0]);

View File

@ -83,7 +83,7 @@ public class ChannelContextTests extends ESTestCase {
if (t == null) {
throw new AssertionError("Close should not fail");
} else {
exception.set((Exception) t);
exception.set(t);
}
});

View File

@ -31,7 +31,7 @@ import static org.mockito.Mockito.mock;
public class FlushOperationTests extends ESTestCase {
private BiConsumer<Void, Throwable> listener;
private BiConsumer<Void, Exception> listener;
@Before
@SuppressWarnings("unchecked")

View File

@ -50,7 +50,7 @@ public class SocketChannelContextTests extends ESTestCase {
private TestSocketChannelContext context;
private Consumer<Exception> exceptionHandler;
private NioSocketChannel channel;
private BiConsumer<Void, Throwable> listener;
private BiConsumer<Void, Exception> listener;
private SocketSelector selector;
private ReadWriteHandler readWriteHandler;
@ -125,7 +125,7 @@ public class SocketChannelContextTests extends ESTestCase {
if (t == null) {
throw new AssertionError("Connection should not succeed");
} else {
exception.set((Exception) t);
exception.set(t);
}
});
@ -206,7 +206,7 @@ public class SocketChannelContextTests extends ESTestCase {
ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
WriteOperation writeOperation = mock(WriteOperation.class);
BiConsumer<Void, Throwable> listener2 = mock(BiConsumer.class);
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
when(readWriteHandler.writeToBytes(writeOperation)).thenReturn(Arrays.asList(new FlushOperation(buffer, listener),
new FlushOperation(buffer, listener2)));
context.queueWriteOperation(writeOperation);
@ -232,7 +232,7 @@ public class SocketChannelContextTests extends ESTestCase {
ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
BiConsumer<Void, Throwable> listener2 = mock(BiConsumer.class);
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
assertFalse(context.readyForFlush());
when(channel.isOpen()).thenReturn(true);

View File

@ -50,7 +50,7 @@ public class SocketSelectorTests extends ESTestCase {
private NioSocketChannel channel;
private TestSelectionKey selectionKey;
private SocketChannelContext channelContext;
private BiConsumer<Void, Throwable> listener;
private BiConsumer<Void, Exception> listener;
private ByteBuffer[] buffers = {ByteBuffer.allocate(1)};
private Selector rawSelector;

View File

@ -20,26 +20,21 @@
package org.elasticsearch.transport.netty4;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedSelectorException;
import java.util.concurrent.CompletableFuture;
public class NettyTcpChannel implements TcpChannel {
private final Channel channel;
private final String profile;
private final CompletableFuture<Void> closeContext = new CompletableFuture<>();
private final CompletableContext<Void> closeContext = new CompletableContext<>();
NettyTcpChannel(Channel channel, String profile) {
this.channel = channel;
@ -51,9 +46,9 @@ public class NettyTcpChannel implements TcpChannel {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally(cause);
closeContext.completeExceptionally((Exception) cause);
}
}
});
@ -71,7 +66,7 @@ public class NettyTcpChannel implements TcpChannel {
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.whenComplete(ActionListener.toBiConsumer(listener));
closeContext.addListener(ActionListener.toBiConsumer(listener));
}
@Override

View File

@ -96,7 +96,7 @@ public class HttpReadWriteHandler implements ReadWriteHandler {
}
@Override
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener) {
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener) {
assert message instanceof NioHttpResponse : "This channel only supports messages that are of type: "
+ NioHttpResponse.class + ". Found type: " + message.getClass() + ".";
return new HttpWriteOperation(context, (NioHttpResponse) message, listener);

View File

@ -28,16 +28,16 @@ public class HttpWriteOperation implements WriteOperation {
private final SocketChannelContext channelContext;
private final NioHttpResponse response;
private final BiConsumer<Void, Throwable> listener;
private final BiConsumer<Void, Exception> listener;
HttpWriteOperation(SocketChannelContext channelContext, NioHttpResponse response, BiConsumer<Void, Throwable> listener) {
HttpWriteOperation(SocketChannelContext channelContext, NioHttpResponse response, BiConsumer<Void, Exception> listener) {
this.channelContext = channelContext;
this.response = response;
this.listener = listener;
}
@Override
public BiConsumer<Void, Throwable> getListener() {
public BiConsumer<Void, Exception> getListener() {
return listener;
}

View File

@ -36,7 +36,7 @@ import java.util.function.BiConsumer;
* complete that promise when accept is called. It delegates the normal promise methods to the underlying
* promise.
*/
public class NettyListener implements BiConsumer<Void, Throwable>, ChannelPromise {
public class NettyListener implements BiConsumer<Void, Exception>, ChannelPromise {
private final ChannelPromise promise;
@ -45,11 +45,11 @@ public class NettyListener implements BiConsumer<Void, Throwable>, ChannelPromis
}
@Override
public void accept(Void v, Throwable throwable) {
if (throwable == null) {
public void accept(Void v, Exception exception) {
if (exception == null) {
promise.setSuccess();
} else {
promise.setFailure(throwable);
promise.setFailure(exception);
}
}
@ -212,17 +212,22 @@ public class NettyListener implements BiConsumer<Void, Throwable>, ChannelPromis
return promise.unvoid();
}
public static NettyListener fromBiConsumer(BiConsumer<Void, Throwable> biConsumer, Channel channel) {
public static NettyListener fromBiConsumer(BiConsumer<Void, Exception> biConsumer, Channel channel) {
if (biConsumer instanceof NettyListener) {
return (NettyListener) biConsumer;
} else {
ChannelPromise channelPromise = channel.newPromise();
channelPromise.addListener(f -> {
if (f.cause() == null) {
Throwable cause = f.cause();
if (cause == null) {
biConsumer.accept(null, null);
} else {
ExceptionsHelper.dieOnError(f.cause());
biConsumer.accept(null, f.cause());
if (cause instanceof Error) {
ExceptionsHelper.dieOnError(cause);
biConsumer.accept(null, new Exception(cause));
} else {
biConsumer.accept(null, (Exception) cause);
}
}
});

View File

@ -120,7 +120,7 @@ public class NioHttpChannel extends AbstractRestChannel {
toClose.add(nioChannel::close);
}
BiConsumer<Void, Throwable> listener = (aVoid, throwable) -> Releasables.close(toClose);
BiConsumer<Void, Exception> listener = (aVoid, ex) -> Releasables.close(toClose);
nioChannel.getContext().sendMessage(new NioHttpResponse(sequence, resp), listener);
success = true;
} finally {

View File

@ -90,18 +90,12 @@ public interface ActionListener<Response> {
* @param <Response> the type of the response
* @return a bi consumer that will complete the wrapped listener
*/
static <Response> BiConsumer<Response, Throwable> toBiConsumer(ActionListener<Response> listener) {
static <Response> BiConsumer<Response, Exception> toBiConsumer(ActionListener<Response> listener) {
return (response, throwable) -> {
if (throwable == null) {
listener.onResponse(response);
} else {
if (throwable instanceof Exception) {
listener.onFailure((Exception) throwable);
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new AssertionError("Should have been either Error or Exception", throwable);
}
listener.onFailure(throwable);
}
};
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.mocksocket.MockSocket;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.BufferedInputStream;
@ -50,7 +51,6 @@ import java.net.SocketException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@ -218,7 +218,7 @@ public class MockTcpTransport extends TcpTransport {
private final Socket activeChannel;
private final String profile;
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private final CompletableContext<Void> closeFuture = new CompletableContext<>();
/**
* Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic.
@ -364,7 +364,7 @@ public class MockTcpTransport extends TcpTransport {
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeFuture.whenComplete(ActionListener.toBiConsumer(listener));
closeFuture.addListener(ActionListener.toBiConsumer(listener));
}
@Override

View File

@ -159,7 +159,7 @@ public final class SSLChannelContext extends SocketChannelContext {
private static class CloseNotifyOperation implements WriteOperation {
private static final BiConsumer<Void, Throwable> LISTENER = (v, t) -> {};
private static final BiConsumer<Void, Exception> LISTENER = (v, t) -> {};
private static final Object WRITE_OBJECT = new Object();
private final SocketChannelContext channelContext;
@ -168,7 +168,7 @@ public final class SSLChannelContext extends SocketChannelContext {
}
@Override
public BiConsumer<Void, Throwable> getListener() {
public BiConsumer<Void, Exception> getListener() {
return LISTENER;
}

View File

@ -41,7 +41,7 @@ public class SSLChannelContextTests extends ESTestCase {
private SSLChannelContext context;
private InboundChannelBuffer channelBuffer;
private SocketSelector selector;
private BiConsumer<Void, Throwable> listener;
private BiConsumer<Void, Exception> listener;
private Consumer exceptionHandler;
private SSLDriver sslDriver;
private ByteBuffer readBuffer = ByteBuffer.allocate(1 << 14);
@ -266,7 +266,7 @@ public class SSLChannelContextTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testMultipleWritesPartialFlushes() throws IOException {
BiConsumer<Void, Throwable> listener2 = mock(BiConsumer.class);
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
ByteBuffer[] buffers1 = {ByteBuffer.allocate(10)};
ByteBuffer[] buffers2 = {ByteBuffer.allocate(5)};
FlushReadyWrite flushOperation1 = mock(FlushReadyWrite.class);