diff --git a/libs/elasticsearch-core/src/main/java/org/elasticsearch/common/concurrent/CompletableContext.java b/libs/elasticsearch-core/src/main/java/org/elasticsearch/common/concurrent/CompletableContext.java new file mode 100644 index 00000000000..e8388429318 --- /dev/null +++ b/libs/elasticsearch-core/src/main/java/org/elasticsearch/common/concurrent/CompletableContext.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.concurrent; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + +/** + * A thread-safe completable context that allows listeners to be attached. This class relies on the + * {@link CompletableFuture} for the concurrency logic. However, it does not accept {@link Throwable} as + * an exceptional result. This allows attaching listeners that only handle {@link Exception}. + * + * @param the result type + */ +public class CompletableContext { + + private final CompletableFuture completableFuture = new CompletableFuture<>(); + + public void addListener(BiConsumer listener) { + BiConsumer castThrowable = (v, t) -> { + if (t == null) { + listener.accept(v, null); + } else { + assert !(t instanceof Error) : "Cannot be error"; + listener.accept(v, (Exception) t); + } + }; + completableFuture.whenComplete(castThrowable); + } + + public boolean isDone() { + return completableFuture.isDone(); + } + + public boolean isCompletedExceptionally() { + return completableFuture.isCompletedExceptionally(); + } + + public boolean completeExceptionally(Exception ex) { + return completableFuture.completeExceptionally(ex); + } + + public boolean complete(T value) { + return completableFuture.complete(value); + } +} diff --git a/libs/elasticsearch-nio/build.gradle b/libs/elasticsearch-nio/build.gradle index 018874adf70..f8b0b8fba13 100644 --- a/libs/elasticsearch-nio/build.gradle +++ b/libs/elasticsearch-nio/build.gradle @@ -33,6 +33,8 @@ publishing { } dependencies { + compile "org.elasticsearch:elasticsearch-core:${version}" + testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java index ba379e28732..87c0ff2817e 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java @@ -28,7 +28,7 @@ public abstract class BytesWriteHandler implements ReadWriteHandler { private static final List EMPTY_LIST = Collections.emptyList(); - public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer listener) { + public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer listener) { assert message instanceof ByteBuffer[] : "This channel only supports messages that are of type: " + ByteBuffer[].class + ". Found type: " + message.getClass() + "."; return new FlushReadyWrite(context, (ByteBuffer[]) message, listener); diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java index 01f35347aa4..93930bbabf0 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelContext.java @@ -19,11 +19,12 @@ package org.elasticsearch.nio; +import org.elasticsearch.common.concurrent.CompletableContext; + import java.io.IOException; import java.nio.channels.NetworkChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; -import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -37,7 +38,7 @@ public abstract class ChannelContext exceptionHandler; - private final CompletableFuture closeContext = new CompletableFuture<>(); + private final CompletableContext closeContext = new CompletableContext<>(); private volatile SelectionKey selectionKey; ChannelContext(S rawChannel, Consumer exceptionHandler) { @@ -81,8 +82,8 @@ public abstract class ChannelContext listener) { - closeContext.whenComplete(listener); + public void addCloseListener(BiConsumer listener) { + closeContext.addListener(listener); } public boolean isOpen() { diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/FlushOperation.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/FlushOperation.java index 3102c972a67..7a1696483db 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/FlushOperation.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/FlushOperation.java @@ -25,13 +25,13 @@ import java.util.function.BiConsumer; public class FlushOperation { - private final BiConsumer listener; + private final BiConsumer listener; private final ByteBuffer[] buffers; private final int[] offsets; private final int length; private int internalIndex; - public FlushOperation(ByteBuffer[] buffers, BiConsumer listener) { + public FlushOperation(ByteBuffer[] buffers, BiConsumer listener) { this.listener = listener; this.buffers = buffers; this.offsets = new int[buffers.length]; @@ -44,7 +44,7 @@ public class FlushOperation { length = offset; } - public BiConsumer getListener() { + public BiConsumer getListener() { return listener; } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/FlushReadyWrite.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/FlushReadyWrite.java index 65bc8f17aaf..61c997603ff 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/FlushReadyWrite.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/FlushReadyWrite.java @@ -27,7 +27,7 @@ public class FlushReadyWrite extends FlushOperation implements WriteOperation { private final SocketChannelContext channelContext; private final ByteBuffer[] buffers; - FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer listener) { + FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer listener) { super(buffers, listener); this.channelContext = channelContext; this.buffers = buffers; diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java index 2f9705f5f8f..ea633bd3276 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java @@ -53,7 +53,7 @@ public abstract class NioChannel { * * @param listener to be called at close */ - public void addCloseListener(BiConsumer listener) { + public void addCloseListener(BiConsumer listener) { getContext().addCloseListener(listener); } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java index 1b8f11e73d4..32e93476691 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java @@ -60,7 +60,7 @@ public class NioSocketChannel extends NioChannel { return remoteAddress; } - public void addConnectListener(BiConsumer listener) { + public void addConnectListener(BiConsumer listener) { context.addConnectListener(listener); } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java index f0637ea2652..6b8688eccfd 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ReadWriteHandler.java @@ -38,7 +38,7 @@ public interface ReadWriteHandler { * @param listener the listener to be called when the message is sent * @return the write operation to be queued */ - WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer listener); + WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer listener); /** * This method is called on the event loop thread. It should serialize a write operation object to bytes diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java index f2d299a9d32..6a769b4d173 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java @@ -19,6 +19,7 @@ package org.elasticsearch.nio; +import org.elasticsearch.common.concurrent.CompletableContext; import org.elasticsearch.nio.utils.ExceptionsHelper; import java.io.IOException; @@ -27,7 +28,6 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.LinkedList; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -48,7 +48,7 @@ public abstract class SocketChannelContext extends ChannelContext protected final AtomicBoolean isClosing = new AtomicBoolean(false); private final ReadWriteHandler readWriteHandler; private final SocketSelector selector; - private final CompletableFuture connectContext = new CompletableFuture<>(); + private final CompletableContext connectContext = new CompletableContext<>(); private final LinkedList pendingFlushes = new LinkedList<>(); private boolean ioException; private boolean peerClosed; @@ -73,8 +73,8 @@ public abstract class SocketChannelContext extends ChannelContext return channel; } - public void addConnectListener(BiConsumer listener) { - connectContext.whenComplete(listener); + public void addConnectListener(BiConsumer listener) { + connectContext.addListener(listener); } public boolean isConnectComplete() { @@ -121,7 +121,7 @@ public abstract class SocketChannelContext extends ChannelContext return isConnected; } - public void sendMessage(Object message, BiConsumer listener) { + public void sendMessage(Object message, BiConsumer listener) { if (isClosing.get()) { listener.accept(null, new ClosedChannelException()); return; diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java index 88b3cef41cd..30ef7b317a3 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java @@ -138,7 +138,7 @@ public class SocketSelector extends ESSelector { * @param listener to be executed * @param value to provide to listener */ - public void executeListener(BiConsumer listener, V value) { + public void executeListener(BiConsumer listener, V value) { assertOnSelectorThread(); try { listener.accept(value, null); @@ -154,7 +154,7 @@ public class SocketSelector extends ESSelector { * @param listener to be executed * @param exception to provide to listener */ - public void executeFailedListener(BiConsumer listener, Exception exception) { + public void executeFailedListener(BiConsumer listener, Exception exception) { assertOnSelectorThread(); try { listener.accept(null, exception); diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java index 25de6ab7326..3d17519be7e 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java @@ -27,7 +27,7 @@ import java.util.function.BiConsumer; */ public interface WriteOperation { - BiConsumer getListener(); + BiConsumer getListener(); SocketChannelContext getChannel(); diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java index addfcdedbf9..e5c236e48a8 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java @@ -45,7 +45,7 @@ public class BytesChannelContextTests extends ESTestCase { private BytesChannelContext context; private InboundChannelBuffer channelBuffer; private SocketSelector selector; - private BiConsumer listener; + private BiConsumer listener; private int messageLength; @Before @@ -191,7 +191,7 @@ public class BytesChannelContextTests extends ESTestCase { public void testMultipleWritesPartialFlushes() throws IOException { assertFalse(context.readyForFlush()); - BiConsumer listener2 = mock(BiConsumer.class); + BiConsumer listener2 = mock(BiConsumer.class); FlushReadyWrite flushOperation1 = mock(FlushReadyWrite.class); FlushReadyWrite flushOperation2 = mock(FlushReadyWrite.class); when(flushOperation1.getBuffersToWrite()).thenReturn(new ByteBuffer[0]); diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelContextTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelContextTests.java index f262dd06330..586dae83d08 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelContextTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelContextTests.java @@ -83,7 +83,7 @@ public class ChannelContextTests extends ESTestCase { if (t == null) { throw new AssertionError("Close should not fail"); } else { - exception.set((Exception) t); + exception.set(t); } }); diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/FlushOperationTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/FlushOperationTests.java index a244de51f35..0f3078715fd 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/FlushOperationTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/FlushOperationTests.java @@ -31,7 +31,7 @@ import static org.mockito.Mockito.mock; public class FlushOperationTests extends ESTestCase { - private BiConsumer listener; + private BiConsumer listener; @Before @SuppressWarnings("unchecked") diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java index d6787f7cc15..f27052ac5d5 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java @@ -50,7 +50,7 @@ public class SocketChannelContextTests extends ESTestCase { private TestSocketChannelContext context; private Consumer exceptionHandler; private NioSocketChannel channel; - private BiConsumer listener; + private BiConsumer listener; private SocketSelector selector; private ReadWriteHandler readWriteHandler; @@ -125,7 +125,7 @@ public class SocketChannelContextTests extends ESTestCase { if (t == null) { throw new AssertionError("Connection should not succeed"); } else { - exception.set((Exception) t); + exception.set(t); } }); @@ -206,7 +206,7 @@ public class SocketChannelContextTests extends ESTestCase { ByteBuffer[] buffer = {ByteBuffer.allocate(10)}; WriteOperation writeOperation = mock(WriteOperation.class); - BiConsumer listener2 = mock(BiConsumer.class); + BiConsumer listener2 = mock(BiConsumer.class); when(readWriteHandler.writeToBytes(writeOperation)).thenReturn(Arrays.asList(new FlushOperation(buffer, listener), new FlushOperation(buffer, listener2))); context.queueWriteOperation(writeOperation); @@ -232,7 +232,7 @@ public class SocketChannelContextTests extends ESTestCase { ByteBuffer[] buffer = {ByteBuffer.allocate(10)}; - BiConsumer listener2 = mock(BiConsumer.class); + BiConsumer listener2 = mock(BiConsumer.class); assertFalse(context.readyForFlush()); when(channel.isOpen()).thenReturn(true); diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java index 78911f20289..f8775d03b42 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java @@ -50,7 +50,7 @@ public class SocketSelectorTests extends ESTestCase { private NioSocketChannel channel; private TestSelectionKey selectionKey; private SocketChannelContext channelContext; - private BiConsumer listener; + private BiConsumer listener; private ByteBuffer[] buffers = {ByteBuffer.allocate(1)}; private Selector rawSelector; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java index 602835b5ca2..f650e757e7a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java @@ -20,26 +20,21 @@ package org.elasticsearch.transport.netty4; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPromise; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.concurrent.CompletableContext; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TransportException; import java.net.InetSocketAddress; -import java.nio.channels.ClosedSelectorException; -import java.util.concurrent.CompletableFuture; public class NettyTcpChannel implements TcpChannel { private final Channel channel; private final String profile; - private final CompletableFuture closeContext = new CompletableFuture<>(); + private final CompletableContext closeContext = new CompletableContext<>(); NettyTcpChannel(Channel channel, String profile) { this.channel = channel; @@ -51,9 +46,9 @@ public class NettyTcpChannel implements TcpChannel { Throwable cause = f.cause(); if (cause instanceof Error) { Netty4Utils.maybeDie(cause); - closeContext.completeExceptionally(cause); + closeContext.completeExceptionally(new Exception(cause)); } else { - closeContext.completeExceptionally(cause); + closeContext.completeExceptionally((Exception) cause); } } }); @@ -71,7 +66,7 @@ public class NettyTcpChannel implements TcpChannel { @Override public void addCloseListener(ActionListener listener) { - closeContext.whenComplete(ActionListener.toBiConsumer(listener)); + closeContext.addListener(ActionListener.toBiConsumer(listener)); } @Override diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index e3481e3c254..681736a311d 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -96,7 +96,7 @@ public class HttpReadWriteHandler implements ReadWriteHandler { } @Override - public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer listener) { + public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer listener) { assert message instanceof NioHttpResponse : "This channel only supports messages that are of type: " + NioHttpResponse.class + ". Found type: " + message.getClass() + "."; return new HttpWriteOperation(context, (NioHttpResponse) message, listener); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java index 8ddce7a5b73..207843bfe39 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java @@ -28,16 +28,16 @@ public class HttpWriteOperation implements WriteOperation { private final SocketChannelContext channelContext; private final NioHttpResponse response; - private final BiConsumer listener; + private final BiConsumer listener; - HttpWriteOperation(SocketChannelContext channelContext, NioHttpResponse response, BiConsumer listener) { + HttpWriteOperation(SocketChannelContext channelContext, NioHttpResponse response, BiConsumer listener) { this.channelContext = channelContext; this.response = response; this.listener = listener; } @Override - public BiConsumer getListener() { + public BiConsumer getListener() { return listener; } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyListener.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyListener.java index b907c0f2bc6..2cdaa4708d1 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyListener.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyListener.java @@ -36,7 +36,7 @@ import java.util.function.BiConsumer; * complete that promise when accept is called. It delegates the normal promise methods to the underlying * promise. */ -public class NettyListener implements BiConsumer, ChannelPromise { +public class NettyListener implements BiConsumer, ChannelPromise { private final ChannelPromise promise; @@ -45,11 +45,11 @@ public class NettyListener implements BiConsumer, ChannelPromis } @Override - public void accept(Void v, Throwable throwable) { - if (throwable == null) { + public void accept(Void v, Exception exception) { + if (exception == null) { promise.setSuccess(); } else { - promise.setFailure(throwable); + promise.setFailure(exception); } } @@ -212,17 +212,22 @@ public class NettyListener implements BiConsumer, ChannelPromis return promise.unvoid(); } - public static NettyListener fromBiConsumer(BiConsumer biConsumer, Channel channel) { + public static NettyListener fromBiConsumer(BiConsumer biConsumer, Channel channel) { if (biConsumer instanceof NettyListener) { return (NettyListener) biConsumer; } else { ChannelPromise channelPromise = channel.newPromise(); channelPromise.addListener(f -> { - if (f.cause() == null) { + Throwable cause = f.cause(); + if (cause == null) { biConsumer.accept(null, null); } else { - ExceptionsHelper.dieOnError(f.cause()); - biConsumer.accept(null, f.cause()); + if (cause instanceof Error) { + ExceptionsHelper.dieOnError(cause); + biConsumer.accept(null, new Exception(cause)); + } else { + biConsumer.accept(null, (Exception) cause); + } } }); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java index 97eba20a16f..61cafed86a5 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java @@ -120,7 +120,7 @@ public class NioHttpChannel extends AbstractRestChannel { toClose.add(nioChannel::close); } - BiConsumer listener = (aVoid, throwable) -> Releasables.close(toClose); + BiConsumer listener = (aVoid, ex) -> Releasables.close(toClose); nioChannel.getContext().sendMessage(new NioHttpResponse(sequence, resp), listener); success = true; } finally { diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index 8579fb55613..f639f139b55 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -90,18 +90,12 @@ public interface ActionListener { * @param the type of the response * @return a bi consumer that will complete the wrapped listener */ - static BiConsumer toBiConsumer(ActionListener listener) { + static BiConsumer toBiConsumer(ActionListener listener) { return (response, throwable) -> { if (throwable == null) { listener.onResponse(response); } else { - if (throwable instanceof Exception) { - listener.onFailure((Exception) throwable); - } else if (throwable instanceof Error) { - throw (Error) throwable; - } else { - throw new AssertionError("Should have been either Error or Exception", throwable); - } + listener.onFailure(throwable); } }; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 8b814df8af9..37bf95d0b15 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.mocksocket.MockSocket; +import org.elasticsearch.common.concurrent.CompletableContext; import org.elasticsearch.threadpool.ThreadPool; import java.io.BufferedInputStream; @@ -50,7 +51,6 @@ import java.net.SocketException; import java.util.Collections; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -218,7 +218,7 @@ public class MockTcpTransport extends TcpTransport { private final Socket activeChannel; private final String profile; private final CancellableThreads cancellableThreads = new CancellableThreads(); - private final CompletableFuture closeFuture = new CompletableFuture<>(); + private final CompletableContext closeFuture = new CompletableContext<>(); /** * Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic. @@ -364,7 +364,7 @@ public class MockTcpTransport extends TcpTransport { @Override public void addCloseListener(ActionListener listener) { - closeFuture.whenComplete(ActionListener.toBiConsumer(listener)); + closeFuture.addListener(ActionListener.toBiConsumer(listener)); } @Override diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java index 075e6818393..171507de741 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java @@ -159,7 +159,7 @@ public final class SSLChannelContext extends SocketChannelContext { private static class CloseNotifyOperation implements WriteOperation { - private static final BiConsumer LISTENER = (v, t) -> {}; + private static final BiConsumer LISTENER = (v, t) -> {}; private static final Object WRITE_OBJECT = new Object(); private final SocketChannelContext channelContext; @@ -168,7 +168,7 @@ public final class SSLChannelContext extends SocketChannelContext { } @Override - public BiConsumer getListener() { + public BiConsumer getListener() { return LISTENER; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java index fc501c68922..168dcd64e6c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java @@ -41,7 +41,7 @@ public class SSLChannelContextTests extends ESTestCase { private SSLChannelContext context; private InboundChannelBuffer channelBuffer; private SocketSelector selector; - private BiConsumer listener; + private BiConsumer listener; private Consumer exceptionHandler; private SSLDriver sslDriver; private ByteBuffer readBuffer = ByteBuffer.allocate(1 << 14); @@ -266,7 +266,7 @@ public class SSLChannelContextTests extends ESTestCase { @SuppressWarnings("unchecked") public void testMultipleWritesPartialFlushes() throws IOException { - BiConsumer listener2 = mock(BiConsumer.class); + BiConsumer listener2 = mock(BiConsumer.class); ByteBuffer[] buffers1 = {ByteBuffer.allocate(10)}; ByteBuffer[] buffers2 = {ByteBuffer.allocate(5)}; FlushReadyWrite flushOperation1 = mock(FlushReadyWrite.class);