diff --git a/core/src/main/java/org/elasticsearch/transport/TcpChannel.java b/core/src/main/java/org/elasticsearch/transport/TcpChannel.java index ee2be4ed736..22453ac43b4 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -61,7 +60,7 @@ public interface TcpChannel extends Releasable { * * @param listener to be executed */ - void addCloseListener(ActionListener listener); + void addCloseListener(ActionListener listener); /** @@ -94,7 +93,7 @@ public interface TcpChannel extends Releasable { * @param reference to send to channel * @param listener to execute upon send completion */ - void sendMessage(BytesReference reference, ActionListener listener); + void sendMessage(BytesReference reference, ActionListener listener); /** * Closes the channel. @@ -114,10 +113,10 @@ public interface TcpChannel extends Releasable { */ static void closeChannels(List channels, boolean blocking) { if (blocking) { - ArrayList> futures = new ArrayList<>(channels.size()); + ArrayList> futures = new ArrayList<>(channels.size()); for (final C channel : channels) { if (channel.isOpen()) { - PlainActionFuture closeFuture = PlainActionFuture.newFuture(); + PlainActionFuture closeFuture = PlainActionFuture.newFuture(); channel.addCloseListener(closeFuture); channel.close(); futures.add(closeFuture); @@ -136,15 +135,14 @@ public interface TcpChannel extends Releasable { * @param discoveryNode the node for the pending connections * @param connectionFutures representing the pending connections * @param connectTimeout to wait for a connection - * @param the type of channel * @throws ConnectTransportException if one of the connections fails */ - static void awaitConnected(DiscoveryNode discoveryNode, List> connectionFutures, - TimeValue connectTimeout) throws ConnectTransportException { + static void awaitConnected(DiscoveryNode discoveryNode, List> connectionFutures, TimeValue connectTimeout) + throws ConnectTransportException { Exception connectionException = null; boolean allConnected = true; - for (ActionFuture connectionFuture : connectionFutures) { + for (ActionFuture connectionFuture : connectionFutures) { try { connectionFuture.get(connectTimeout.getMillis(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { @@ -169,8 +167,8 @@ public interface TcpChannel extends Releasable { } } - static void blockOnFutures(List> futures) { - for (ActionFuture future : futures) { + static void blockOnFutures(List> futures) { + for (ActionFuture future : futures) { try { future.get(); } catch (ExecutionException e) { diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 0bf600cb029..bfcd3dff5ab 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -343,7 +343,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements for (TcpChannel channel : channels.getChannels()) { internalSendMessage(channel, pingHeader, new SendMetricListener(pingHeader.length()) { @Override - protected void innerInnerOnResponse(TcpChannel channel) { + protected void innerInnerOnResponse(Void v) { successfulPings.inc(); } @@ -595,10 +595,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements int numConnections = connectionProfile.getNumConnections(); assert numConnections > 0 : "A connection profile must be configured with at least one connection"; List channels = new ArrayList<>(numConnections); - List> connectionFutures = new ArrayList<>(numConnections); + List> connectionFutures = new ArrayList<>(numConnections); for (int i = 0; i < numConnections; ++i) { try { - PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); connectionFutures.add(connectFuture); TcpChannel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture); channels.add(channel); @@ -940,7 +940,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements for (Map.Entry> entry : serverChannels.entrySet()) { String profile = entry.getKey(); List channels = entry.getValue(); - ActionListener closeFailLogger = ActionListener.wrap(c -> {}, + ActionListener closeFailLogger = ActionListener.wrap(c -> {}, e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e)); channels.forEach(c -> c.addCloseListener(closeFailLogger)); TcpChannel.closeChannels(channels, true); @@ -1016,7 +1016,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)); final SendMetricListener closeChannel = new SendMetricListener(message.length()) { @Override - protected void innerInnerOnResponse(TcpChannel channel) { + protected void innerInnerOnResponse(Void v) { TcpChannel.closeChannel(channel, false); } @@ -1060,7 +1060,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements * @return the pending connection * @throws IOException if an I/O exception occurs while opening the channel */ - protected abstract TcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) + protected abstract TcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) throws IOException; /** @@ -1686,7 +1686,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements /** * This listener increments the transmitted bytes metric on success. */ - private abstract class SendMetricListener extends NotifyOnceListener { + private abstract class SendMetricListener extends NotifyOnceListener { private final long messageSize; private SendMetricListener(long messageSize) { @@ -1694,12 +1694,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } @Override - protected final void innerOnResponse(org.elasticsearch.transport.TcpChannel object) { + protected final void innerOnResponse(Void object) { transmittedBytesMetric.inc(messageSize); innerInnerOnResponse(object); } - protected abstract void innerInnerOnResponse(org.elasticsearch.transport.TcpChannel object); + protected abstract void innerInnerOnResponse(Void object); } private final class SendListener extends SendMetricListener { @@ -1715,7 +1715,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } @Override - protected void innerInnerOnResponse(TcpChannel channel) { + protected void innerInnerOnResponse(Void v) { release(); } diff --git a/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 275c6dbaeb2..5182951a0fd 100644 --- a/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -185,8 +185,8 @@ public class TcpTransportTests extends ESTestCase { } @Override - protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, - ActionListener connectListener) throws IOException { + protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) + throws IOException { return new FakeChannel(messageCaptor); } @@ -251,7 +251,7 @@ public class TcpTransportTests extends ESTestCase { } @Override - public void addCloseListener(ActionListener listener) { + public void addCloseListener(ActionListener listener) { } @Override @@ -269,7 +269,7 @@ public class TcpTransportTests extends ESTestCase { } @Override - public void sendMessage(BytesReference reference, ActionListener listener) { + public void sendMessage(BytesReference reference, ActionListener listener) { messageCaptor.set(reference); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 29ff3967d6d..6343a241843 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -249,7 +249,7 @@ public class Netty4Transport extends TcpTransport { } @Override - protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener listener) + protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener listener) throws IOException { ChannelFuture channelFuture = bootstrap.connect(node.getAddress().address()); Channel channel = channelFuture.channel(); @@ -264,7 +264,7 @@ public class Netty4Transport extends TcpTransport { channelFuture.addListener(f -> { if (f.isSuccess()) { - listener.onResponse(nettyChannel); + listener.onResponse(null); } else { Throwable cause = f.cause(); if (cause instanceof Error) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java index 12ab34a32af..fa9989f7270 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java @@ -34,13 +34,13 @@ import java.util.concurrent.CompletableFuture; public class NettyTcpChannel implements TcpChannel { private final Channel channel; - private final CompletableFuture closeContext = new CompletableFuture<>(); + private final CompletableFuture closeContext = new CompletableFuture<>(); NettyTcpChannel(Channel channel) { this.channel = channel; this.channel.closeFuture().addListener(f -> { if (f.isSuccess()) { - closeContext.complete(this); + closeContext.complete(null); } else { Throwable cause = f.cause(); if (cause instanceof Error) { @@ -59,7 +59,7 @@ public class NettyTcpChannel implements TcpChannel { } @Override - public void addCloseListener(ActionListener listener) { + public void addCloseListener(ActionListener listener) { closeContext.whenComplete(ActionListener.toBiConsumer(listener)); } @@ -79,11 +79,11 @@ public class NettyTcpChannel implements TcpChannel { } @Override - public void sendMessage(BytesReference reference, ActionListener listener) { + public void sendMessage(BytesReference reference, ActionListener listener) { final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference)); future.addListener(f -> { if (f.isSuccess()) { - listener.onResponse(this); + listener.onResponse(null); } else { final Throwable cause = f.cause(); Netty4Utils.maybeDie(cause); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 5d5e14b4061..f6ec96d13df 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -171,7 +171,7 @@ public class MockTcpTransport extends TcpTransport { } @Override - protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) + protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) throws IOException { InetSocketAddress address = node.getAddress().address(); final MockSocket socket = new MockSocket(); @@ -186,7 +186,7 @@ public class MockTcpTransport extends TcpTransport { MockChannel channel = new MockChannel(socket, address, "none", (c) -> {}); channel.loopRead(executor); success = true; - connectListener.onResponse(channel); + connectListener.onResponse(null); return channel; } finally { if (success == false) { @@ -231,7 +231,7 @@ public class MockTcpTransport extends TcpTransport { private final String profile; private final CancellableThreads cancellableThreads = new CancellableThreads(); private final Closeable onClose; - private final CompletableFuture closeFuture = new CompletableFuture<>(); + private final CompletableFuture closeFuture = new CompletableFuture<>(); /** * Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic. @@ -356,14 +356,14 @@ public class MockTcpTransport extends TcpTransport { public void close() { try { close0(); - closeFuture.complete(this); + closeFuture.complete(null); } catch (IOException e) { closeFuture.completeExceptionally(e); } } @Override - public void addCloseListener(ActionListener listener) { + public void addCloseListener(ActionListener listener) { closeFuture.whenComplete(ActionListener.toBiConsumer(listener)); } @@ -386,14 +386,14 @@ public class MockTcpTransport extends TcpTransport { } @Override - public void sendMessage(BytesReference reference, ActionListener listener) { + public void sendMessage(BytesReference reference, ActionListener listener) { try { synchronized (this) { OutputStream outputStream = new BufferedOutputStream(activeChannel.getOutputStream()); reference.writeTo(outputStream); outputStream.flush(); } - listener.onResponse(this); + listener.onResponse(null); } catch (IOException e) { listener.onFailure(e); onException(this, e); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 2902b0bccfd..822c0181ae7 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -32,7 +31,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.nio.channel.ChannelFactory; @@ -95,22 +93,11 @@ public class NioTransport extends TcpTransport { } @Override - protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) + protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) throws IOException { NioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get()); openChannels.clientChannelOpened(channel); - // TODO: Temporary conversion due to types - channel.addConnectListener(new ActionListener() { - @Override - public void onResponse(NioChannel nioChannel) { - connectListener.onResponse(nioChannel); - } - - @Override - public void onFailure(Exception e) { - connectListener.onFailure(e); - } - }); + channel.addConnectListener(connectListener); return channel; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java index 67ed2447f63..f91acc5bbea 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java @@ -24,7 +24,6 @@ import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioSocketChannel; import java.io.IOException; @@ -33,10 +32,10 @@ import java.util.ArrayList; public class WriteOperation { private final NioSocketChannel channel; - private final ActionListener listener; + private final ActionListener listener; private final NetworkBytesReference[] references; - public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener listener) { + public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener listener) { this.channel = channel; this.listener = listener; this.references = toArray(bytesReference); @@ -46,7 +45,7 @@ public class WriteOperation { return references; } - public ActionListener getListener() { + public ActionListener getListener() { return listener; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index 21f46631c6e..a2924eff56b 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport.nio.channel; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.nio.ESSelector; import java.io.IOException; @@ -58,7 +57,7 @@ public abstract class AbstractNioChannel closeContext = new CompletableFuture<>(); + private final CompletableFuture closeContext = new CompletableFuture<>(); private final ESSelector selector; private SelectionKey selectionKey; @@ -111,7 +110,7 @@ public abstract class AbstractNioChannel listener) { + public void addCloseListener(ActionListener listener) { closeContext.whenComplete(ActionListener.toBiConsumer(listener)); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java index 5b365fbd36c..643496ac39b 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java @@ -21,12 +21,10 @@ package org.elasticsearch.transport.nio.channel; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.nio.AcceptingSelector; import java.io.IOException; import java.nio.channels.ServerSocketChannel; -import java.util.concurrent.Future; public class NioServerSocketChannel extends AbstractNioChannel { @@ -43,7 +41,7 @@ public class NioServerSocketChannel extends AbstractNioChannel listener) { + public void sendMessage(BytesReference reference, ActionListener listener) { throw new UnsupportedOperationException("Cannot send a message to a server channel."); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java index 520cefd27f9..fb2d940348a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java @@ -21,7 +21,6 @@ package org.elasticsearch.transport.nio.channel; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.nio.NetworkBytesReference; import org.elasticsearch.transport.nio.SocketSelector; @@ -36,7 +35,7 @@ import java.util.concurrent.CompletableFuture; public class NioSocketChannel extends AbstractNioChannel { private final InetSocketAddress remoteAddress; - private final CompletableFuture connectContext = new CompletableFuture<>(); + private final CompletableFuture connectContext = new CompletableFuture<>(); private final SocketSelector socketSelector; private WriteContext writeContext; private ReadContext readContext; @@ -49,19 +48,8 @@ public class NioSocketChannel extends AbstractNioChannel { } @Override - public void sendMessage(BytesReference reference, ActionListener listener) { - // TODO: Temporary conversion due to types - writeContext.sendMessage(reference, new ActionListener() { - @Override - public void onResponse(NioChannel nioChannel) { - listener.onResponse(nioChannel); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + public void sendMessage(BytesReference reference, ActionListener listener) { + writeContext.sendMessage(reference, listener); } @Override @@ -169,12 +157,12 @@ public class NioSocketChannel extends AbstractNioChannel { isConnected = internalFinish(); } if (isConnected) { - connectContext.complete(this); + connectContext.complete(null); } return isConnected; } - public void addConnectListener(ActionListener listener) { + public void addConnectListener(ActionListener listener) { connectContext.whenComplete(ActionListener.toBiConsumer(listener)); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java index 03eb652e1ae..d38cd1320d1 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java @@ -38,7 +38,7 @@ public class TcpWriteContext implements WriteContext { } @Override - public void sendMessage(BytesReference reference, ActionListener listener) { + public void sendMessage(BytesReference reference, ActionListener listener) { if (channel.isWritable() == false) { listener.onFailure(new ClosedChannelException()); return; @@ -96,7 +96,7 @@ public class TcpWriteContext implements WriteContext { } if (headOp.isFullyFlushed()) { - headOp.getListener().onResponse(channel); + headOp.getListener().onResponse(null); } else { queued.push(headOp); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/WriteContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/WriteContext.java index 1a14d279dd2..718b7daf8c6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/WriteContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/WriteContext.java @@ -27,7 +27,7 @@ import java.io.IOException; public interface WriteContext { - void sendMessage(BytesReference reference, ActionListener listener); + void sendMessage(BytesReference reference, ActionListener listener); void queueWriteOperations(WriteOperation writeOperation); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java index 670134d9bee..0de1bb72063 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java @@ -53,7 +53,7 @@ public class SocketSelectorTests extends ESTestCase { private NioSocketChannel channel; private TestSelectionKey selectionKey; private WriteContext writeContext; - private ActionListener listener; + private ActionListener listener; private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1])); private Selector rawSelector; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java index d7284491d64..1f6f95e62af 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java @@ -36,7 +36,7 @@ import static org.mockito.Mockito.when; public class WriteOperationTests extends ESTestCase { private NioSocketChannel channel; - private ActionListener listener; + private ActionListener listener; @Before @SuppressWarnings("unchecked") diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java index 62f87d4f574..93d905c8068 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport.nio.channel; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.nio.AcceptingSelector; @@ -61,26 +62,39 @@ public class NioServerSocketChannelTests extends ESTestCase { } public void testClose() throws Exception { - AtomicReference ref = new AtomicReference<>(); + AtomicBoolean isClosed = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); NioChannel channel = new DoNotCloseServerChannel("nio", mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector); - Consumer listener = (c) -> { - ref.set(c); - latch.countDown(); - }; - channel.addCloseListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(channel))); + + channel.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void o) { + isClosed.set(true); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + isClosed.set(true); + latch.countDown(); + } + }); assertTrue(channel.isOpen()); assertFalse(closedRawChannel.get()); + assertFalse(isClosed.get()); - TcpChannel.closeChannel(channel, true); + PlainActionFuture closeFuture = PlainActionFuture.newFuture(); + channel.addCloseListener(closeFuture); + channel.close(); + closeFuture.actionGet(); assertTrue(closedRawChannel.get()); assertFalse(channel.isOpen()); latch.await(); - assertSame(channel, ref.get()); + assertTrue(isClosed.get()); } private class DoNotCloseServerChannel extends DoNotRegisterServerChannel { diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java index d8d4b41df70..1fb32d0f6e1 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java @@ -68,29 +68,40 @@ public class NioSocketChannelTests extends ESTestCase { } public void testClose() throws Exception { - AtomicReference ref = new AtomicReference<>(); + AtomicBoolean isClosed = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, mock(SocketChannel.class), selector); openChannels.clientChannelOpened(socketChannel); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); - Consumer listener = (c) -> { - ref.set(c); - latch.countDown(); - }; - socketChannel.addCloseListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(socketChannel))); + socketChannel.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void o) { + isClosed.set(true); + latch.countDown(); + } + @Override + public void onFailure(Exception e) { + isClosed.set(true); + latch.countDown(); + } + }); assertTrue(socketChannel.isOpen()); assertFalse(closedRawChannel.get()); + assertFalse(isClosed.get()); assertTrue(openChannels.getClientChannels().containsKey(socketChannel)); - TcpChannel.closeChannel(socketChannel, true); + PlainActionFuture closeFuture = PlainActionFuture.newFuture(); + socketChannel.addCloseListener(closeFuture); + socketChannel.close(); + closeFuture.actionGet(); assertTrue(closedRawChannel.get()); assertFalse(socketChannel.isOpen()); assertFalse(openChannels.getClientChannels().containsKey(socketChannel)); latch.await(); - assertSame(socketChannel, ref.get()); + assertTrue(isClosed.get()); } public void testConnectSucceeds() throws Exception { @@ -100,7 +111,7 @@ public class NioSocketChannelTests extends ESTestCase { socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); selector.scheduleForRegistration(socketChannel); - PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); socketChannel.addConnectListener(connectFuture); connectFuture.get(100, TimeUnit.SECONDS); @@ -116,7 +127,7 @@ public class NioSocketChannelTests extends ESTestCase { socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); selector.scheduleForRegistration(socketChannel); - PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); socketChannel.addConnectListener(connectFuture); ExecutionException e = expectThrows(ExecutionException.class, () -> connectFuture.get(100, TimeUnit.SECONDS)); assertTrue(e.getCause() instanceof IOException); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java index b4fab855403..7e6410b6c61 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java @@ -40,7 +40,7 @@ import static org.mockito.Mockito.when; public class TcpWriteContextTests extends ESTestCase { private SocketSelector selector; - private ActionListener listener; + private ActionListener listener; private TcpWriteContext writeContext; private NioSocketChannel channel; @@ -136,7 +136,7 @@ public class TcpWriteContextTests extends ESTestCase { writeContext.flushChannel(); verify(writeOperation).flush(); - verify(listener).onResponse(channel); + verify(listener).onResponse(null); assertFalse(writeContext.hasQueuedWriteOps()); } @@ -151,7 +151,7 @@ public class TcpWriteContextTests extends ESTestCase { when(writeOperation.isFullyFlushed()).thenReturn(false); writeContext.flushChannel(); - verify(listener, times(0)).onResponse(channel); + verify(listener, times(0)).onResponse(null); assertTrue(writeContext.hasQueuedWriteOps()); } @@ -173,7 +173,7 @@ public class TcpWriteContextTests extends ESTestCase { when(writeOperation2.isFullyFlushed()).thenReturn(false); writeContext.flushChannel(); - verify(listener).onResponse(channel); + verify(listener).onResponse(null); verify(listener2, times(0)).onResponse(channel); assertTrue(writeContext.hasQueuedWriteOps()); @@ -181,7 +181,7 @@ public class TcpWriteContextTests extends ESTestCase { writeContext.flushChannel(); - verify(listener2).onResponse(channel); + verify(listener2).onResponse(null); assertFalse(writeContext.hasQueuedWriteOps()); }