From 80ef9bbdb1cb42583eff3180e5565b8cf1305064 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 16 Nov 2017 11:19:36 -0700 Subject: [PATCH] Remove parameterization from TcpTransport (#27407) This commit is a follow up to the work completed in #27132. Essentially it transitions two more methods (sendMessage and getLocalAddress) from Transport to TcpChannel. With this change, there is no longer a need for TcpTransport to be aware of the specific type of channel a transport returns. So that class is no longer parameterized by channel type. --- .../elasticsearch/transport/TcpChannel.java | 22 ++- .../elasticsearch/transport/TcpTransport.java | 126 ++++++++---------- .../transport/TcpTransportChannel.java | 10 +- .../transport/TcpTransportTests.java | 90 +++++++------ .../transport/netty4/Netty4Transport.java | 28 +--- .../transport/netty4/NettyTcpChannel.java | 33 ++++- .../transport/netty4/Netty4TransportIT.java | 4 +- .../netty4/NettyTransportMultiPortTests.java | 11 +- .../netty4/SimpleNetty4TransportTests.java | 4 +- .../AbstractSimpleTransportTestCase.java | 2 +- .../transport/MockTcpTransport.java | 44 +++--- .../transport/nio/NioTransport.java | 34 +++-- .../nio/channel/NioServerSocketChannel.java | 8 ++ .../nio/channel/NioSocketChannel.java | 18 +++ .../transport/MockTcpTransportTests.java | 6 +- .../nio/SimpleNioTransportTests.java | 5 +- 16 files changed, 239 insertions(+), 206 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/TcpChannel.java b/core/src/main/java/org/elasticsearch/transport/TcpChannel.java index f429e71f4a8..ee2be4ed736 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -19,19 +19,19 @@ package org.elasticsearch.transport; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; @@ -80,6 +80,22 @@ public interface TcpChannel extends Releasable { */ boolean isOpen(); + /** + * Returns the local address for this channel. + * + * @return the local address of this channel. + */ + InetSocketAddress getLocalAddress(); + + /** + * Sends a tcp message to the channel. The listener will be executed once the send process has been + * completed. + * + * @param reference to send to channel + * @param listener to execute upon send completion + */ + void sendMessage(BytesReference reference, ActionListener listener); + /** * Closes the channel. * diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 4092eb62569..d5aed81ae80 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -118,7 +118,7 @@ import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseC import static org.elasticsearch.common.transport.NetworkExceptionHelper.isConnectException; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -public abstract class TcpTransport extends AbstractLifecycleComponent implements Transport { +public abstract class TcpTransport extends AbstractLifecycleComponent implements Transport { public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker"; public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss"; @@ -199,8 +199,8 @@ public abstract class TcpTransport extends AbstractL protected final ConcurrentMap connectedNodes = newConcurrentMap(); protected final ConcurrentMap profileBoundAddresses = newConcurrentMap(); - private final Map> serverChannels = newConcurrentMap(); - private final Set acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Map> serverChannels = newConcurrentMap(); + private final Set acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); protected final KeyedLock connectionLock = new KeyedLock<>(); private final NamedWriteableRegistry namedWriteableRegistry; @@ -340,10 +340,10 @@ public abstract class TcpTransport extends AbstractL for (Map.Entry entry : connectedNodes.entrySet()) { DiscoveryNode node = entry.getKey(); NodeChannels channels = entry.getValue(); - for (Channel channel : channels.getChannels()) { - internalSendMessage(channel, pingHeader, new SendMetricListener(pingHeader.length()) { + for (TcpChannel channel : channels.getChannels()) { + internalSendMessage(channel, pingHeader, new SendMetricListener(pingHeader.length()) { @Override - protected void innerInnerOnResponse(Channel channel) { + protected void innerInnerOnResponse(TcpChannel channel) { successfulPings.inc(); } @@ -397,12 +397,12 @@ public abstract class TcpTransport extends AbstractL public final class NodeChannels implements Connection { private final Map typeMapping; - private final List channels; + private final List channels; private final DiscoveryNode node; private final AtomicBoolean closed = new AtomicBoolean(false); private final Version version; - NodeChannels(DiscoveryNode node, List channels, ConnectionProfile connectionProfile, Version handshakeVersion) { + NodeChannels(DiscoveryNode node, List channels, ConnectionProfile connectionProfile, Version handshakeVersion) { this.node = node; this.channels = Collections.unmodifiableList(channels); assert channels.size() == connectionProfile.getNumConnections() : "expected channels size to be == " @@ -420,11 +420,11 @@ public abstract class TcpTransport extends AbstractL return version; } - public List getChannels() { + public List getChannels() { return channels; } - public Channel channel(TransportRequestOptions.Type type) { + public TcpChannel channel(TransportRequestOptions.Type type) { ConnectionProfile.ConnectionTypeHandle connectionTypeHandle = typeMapping.get(type); if (connectionTypeHandle == null) { throw new IllegalArgumentException("no type channel for [" + type + "]"); @@ -477,7 +477,7 @@ public abstract class TcpTransport extends AbstractL if (closed.get()) { throw new NodeNotConnectedException(node, "connection already closed"); } - Channel channel = channel(options.type()); + TcpChannel channel = channel(options.type()); sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte) 0); } @@ -594,13 +594,13 @@ public abstract class TcpTransport extends AbstractL try { int numConnections = connectionProfile.getNumConnections(); assert numConnections > 0 : "A connection profile must be configured with at least one connection"; - List channels = new ArrayList<>(numConnections); - List> connectionFutures = new ArrayList<>(numConnections); + List channels = new ArrayList<>(numConnections); + List> connectionFutures = new ArrayList<>(numConnections); for (int i = 0; i < numConnections; ++i) { try { - PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); connectionFutures.add(connectFuture); - Channel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture); + TcpChannel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture); channels.add(channel); } catch (Exception e) { // If there was an exception when attempting to instantiate the raw channels, we close all of the channels @@ -618,7 +618,7 @@ public abstract class TcpTransport extends AbstractL } // If we make it past the block above, we have successfully established connections for all of the channels - final Channel handshakeChannel = channels.get(0); // one channel is guaranteed by the connection profile + final TcpChannel handshakeChannel = channels.get(0); // one channel is guaranteed by the connection profile handshakeChannel.addCloseListener(ActionListener.wrap(() -> cancelHandshakeForChannel(handshakeChannel))); Version version; try { @@ -635,7 +635,7 @@ public abstract class TcpTransport extends AbstractL transportService.onConnectionOpened(nodeChannels); final NodeChannels finalNodeChannels = nodeChannels; final AtomicBoolean runOnce = new AtomicBoolean(false); - Consumer onClose = c -> { + Consumer onClose = c -> { assert c.isOpen() == false : "channel is still open when onClose is called"; // we only need to disconnect from the nodes once since all other channels // will also try to run this we protect it from running multiple times. @@ -772,15 +772,15 @@ public abstract class TcpTransport extends AbstractL final AtomicReference boundSocket = new AtomicReference<>(); boolean success = portsRange.iterate(portNumber -> { try { - Channel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); + TcpChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber)); synchronized (serverChannels) { - List list = serverChannels.get(name); + List list = serverChannels.get(name); if (list == null) { list = new ArrayList<>(); serverChannels.put(name, list); } list.add(channel); - boundSocket.set(getLocalAddress(channel)); + boundSocket.set(channel.getLocalAddress()); } } catch (Exception e) { lastException.set(e); @@ -937,9 +937,9 @@ public abstract class TcpTransport extends AbstractL closeLock.writeLock().lock(); try { // first stop to accept any incoming connections so nobody can connect to this transport - for (Map.Entry> entry : serverChannels.entrySet()) { + for (Map.Entry> entry : serverChannels.entrySet()) { String profile = entry.getKey(); - List channels = entry.getValue(); + List channels = entry.getValue(); ActionListener closeFailLogger = ActionListener.wrap(c -> {}, e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e)); channels.forEach(c -> c.addCloseListener(closeFailLogger)); @@ -979,7 +979,7 @@ public abstract class TcpTransport extends AbstractL } } - protected void onException(Channel channel, Exception e) { + protected void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources TcpChannel.closeChannel(channel, false); @@ -1014,9 +1014,9 @@ public abstract class TcpTransport extends AbstractL // in case we are able to return data, serialize the exception content and sent it back to the client if (channel.isOpen()) { BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)); - final SendMetricListener closeChannel = new SendMetricListener(message.length()) { + final SendMetricListener closeChannel = new SendMetricListener(message.length()) { @Override - protected void innerInnerOnResponse(Channel channel) { + protected void innerInnerOnResponse(TcpChannel channel) { TcpChannel.closeChannel(channel, false); } @@ -1036,34 +1036,19 @@ public abstract class TcpTransport extends AbstractL } } - protected void serverAcceptedChannel(Channel channel) { + protected void serverAcceptedChannel(TcpChannel channel) { boolean addedOnThisCall = acceptedChannels.add(channel); assert addedOnThisCall : "Channel should only be added to accept channel set once"; channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel))); } - /** - * Returns the channels local address - */ - protected abstract InetSocketAddress getLocalAddress(Channel channel); - /** * Binds to the given {@link InetSocketAddress} * * @param name the profile name * @param address the address to bind to */ - protected abstract Channel bind(String name, InetSocketAddress address) throws IOException; - - /** - * Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception - * is thrown during the send. If an exception is thrown, the listener's onException method will be called. - * - * @param channel the destination channel - * @param reference the byte reference for the message - * @param listener the listener to call when the operation has completed - */ - protected abstract void sendMessage(Channel channel, BytesReference reference, ActionListener listener); + protected abstract TcpChannel bind(String name, InetSocketAddress address) throws IOException; /** * Initiate a single tcp socket channel to a node. Implementations do not have to observe the connectTimeout. @@ -1075,7 +1060,7 @@ public abstract class TcpTransport extends AbstractL * @return the pending connection * @throws IOException if an I/O exception occurs while opening the channel */ - protected abstract Channel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) + protected abstract TcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) throws IOException; /** @@ -1088,7 +1073,7 @@ public abstract class TcpTransport extends AbstractL return compress && (!(request instanceof BytesTransportRequest)); } - private void sendRequestToChannel(final DiscoveryNode node, final Channel targetChannel, final long requestId, final String action, + private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options, Version channelVersion, byte status) throws IOException, TransportException { @@ -1120,9 +1105,9 @@ public abstract class TcpTransport extends AbstractL BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream); final TransportRequestOptions finalOptions = options; // this might be called in a different thread - SendListener onRequestSent = new SendListener(stream, + SendListener onRequestSent = new SendListener(channel, stream, () -> transportService.onRequestSent(node, requestId, action, request, finalOptions), message.length()); - internalSendMessage(targetChannel, message, onRequestSent); + internalSendMessage(channel, message, onRequestSent); addedReleaseListener = true; } finally { if (!addedReleaseListener) { @@ -1134,13 +1119,13 @@ public abstract class TcpTransport extends AbstractL /** * sends a message to the given channel, using the given callbacks. */ - private void internalSendMessage(Channel targetChannel, BytesReference message, SendMetricListener listener) { + private void internalSendMessage(TcpChannel channel, BytesReference message, SendMetricListener listener) { try { - sendMessage(targetChannel, message, listener); + channel.sendMessage(message, listener); } catch (Exception ex) { // call listener to ensure that any resources are released listener.onFailure(ex); - onException(targetChannel, ex); + onException(channel, ex); } } @@ -1153,12 +1138,12 @@ public abstract class TcpTransport extends AbstractL * @param requestId the request ID this response replies to * @param action the action this response replies to */ - public void sendErrorResponse(Version nodeVersion, Channel channel, final Exception error, final long requestId, + public void sendErrorResponse(Version nodeVersion, TcpChannel channel, final Exception error, final long requestId, final String action) throws IOException { try (BytesStreamOutput stream = new BytesStreamOutput()) { stream.setVersion(nodeVersion); RemoteTransportException tx = new RemoteTransportException( - nodeName(), new TransportAddress(getLocalAddress(channel)), action, error); + nodeName(), new TransportAddress(channel.getLocalAddress()), action, error); threadPool.getThreadContext().writeTo(stream); stream.writeException(tx); byte status = 0; @@ -1167,7 +1152,7 @@ public abstract class TcpTransport extends AbstractL final BytesReference bytes = stream.bytes(); final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length()); CompositeBytesReference message = new CompositeBytesReference(header, bytes); - SendListener onResponseSent = new SendListener(null, + SendListener onResponseSent = new SendListener(channel, null, () -> transportService.onResponseSent(requestId, action, error), message.length()); internalSendMessage(channel, message, onResponseSent); } @@ -1178,12 +1163,12 @@ public abstract class TcpTransport extends AbstractL * * @see #sendErrorResponse(Version, TcpChannel, Exception, long, String) for sending back errors to the caller */ - public void sendResponse(Version nodeVersion, Channel channel, final TransportResponse response, final long requestId, + public void sendResponse(Version nodeVersion, TcpChannel channel, final TransportResponse response, final long requestId, final String action, TransportResponseOptions options) throws IOException { sendResponse(nodeVersion, channel, response, requestId, action, options, (byte) 0); } - private void sendResponse(Version nodeVersion, Channel channel, final TransportResponse response, final long requestId, + private void sendResponse(Version nodeVersion, TcpChannel channel, final TransportResponse response, final long requestId, final String action, TransportResponseOptions options, byte status) throws IOException { if (compress) { options = TransportResponseOptions.builder(options).withCompress(true).build(); @@ -1202,7 +1187,7 @@ public abstract class TcpTransport extends AbstractL final TransportResponseOptions finalOptions = options; // this might be called in a different thread - SendListener listener = new SendListener(stream, + SendListener listener = new SendListener(channel, stream, () -> transportService.onResponseSent(requestId, action, response, finalOptions), message.length()); internalSendMessage(channel, message, listener); addedReleaseListener = true; @@ -1355,7 +1340,7 @@ public abstract class TcpTransport extends AbstractL /** * This method handles the message receive part for both request and responses */ - public final void messageReceived(BytesReference reference, Channel channel, String profileName, + public final void messageReceived(BytesReference reference, TcpChannel channel, String profileName, InetSocketAddress remoteAddress, int messageLengthBytes) throws IOException { final int totalMessageSize = messageLengthBytes + TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; readBytesMetric.inc(totalMessageSize); @@ -1494,8 +1479,9 @@ public abstract class TcpTransport extends AbstractL }); } - protected String handleRequest(Channel channel, String profileName, final StreamInput stream, long requestId, int messageLengthBytes, - Version version, InetSocketAddress remoteAddress, byte status) throws IOException { + protected String handleRequest(TcpChannel channel, String profileName, final StreamInput stream, long requestId, + int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status) + throws IOException { final String action = stream.readString(); transportService.onRequestReceived(requestId, action); TransportChannel transportChannel = null; @@ -1514,7 +1500,7 @@ public abstract class TcpTransport extends AbstractL } else { getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes); } - transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName, + transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName, messageLengthBytes); final TransportRequest request = reg.newRequest(stream); request.remoteAddress(new TransportAddress(remoteAddress)); @@ -1525,7 +1511,7 @@ public abstract class TcpTransport extends AbstractL } catch (Exception e) { // the circuit breaker tripped if (transportChannel == null) { - transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName, 0); + transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName, 0); } try { transportChannel.sendResponse(e); @@ -1611,7 +1597,8 @@ public abstract class TcpTransport extends AbstractL } } - protected Version executeHandshake(DiscoveryNode node, Channel channel, TimeValue timeout) throws IOException, InterruptedException { + protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) + throws IOException, InterruptedException { numHandshakes.inc(); final long requestId = newRequestId(); final HandshakeResponseHandler handler = new HandshakeResponseHandler(channel); @@ -1671,7 +1658,7 @@ public abstract class TcpTransport extends AbstractL /** * Called once the channel is closed for instance due to a disconnect or a closed socket etc. */ - private void cancelHandshakeForChannel(Channel channel) { + private void cancelHandshakeForChannel(TcpChannel channel) { final Optional first = pendingHandshakes.entrySet().stream() .filter((entry) -> entry.getValue().channel == channel).map(Map.Entry::getKey).findFirst(); if (first.isPresent()) { @@ -1699,7 +1686,7 @@ public abstract class TcpTransport extends AbstractL /** * This listener increments the transmitted bytes metric on success. */ - private abstract class SendMetricListener extends NotifyOnceListener { + private abstract class SendMetricListener extends NotifyOnceListener { private final long messageSize; private SendMetricListener(long messageSize) { @@ -1707,31 +1694,34 @@ public abstract class TcpTransport extends AbstractL } @Override - protected final void innerOnResponse(T object) { + protected final void innerOnResponse(org.elasticsearch.transport.TcpChannel object) { transmittedBytesMetric.inc(messageSize); innerInnerOnResponse(object); } - protected abstract void innerInnerOnResponse(T object); + protected abstract void innerInnerOnResponse(org.elasticsearch.transport.TcpChannel object); } - private final class SendListener extends SendMetricListener { + private final class SendListener extends SendMetricListener { + private final TcpChannel channel; private final Releasable optionalReleasable; private final Runnable transportAdaptorCallback; - private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback, long messageLength) { + private SendListener(TcpChannel channel, Releasable optionalReleasable, Runnable transportAdaptorCallback, long messageLength) { super(messageLength); + this.channel = channel; this.optionalReleasable = optionalReleasable; this.transportAdaptorCallback = transportAdaptorCallback; } @Override - protected void innerInnerOnResponse(Channel channel) { + protected void innerInnerOnResponse(TcpChannel channel) { release(); } @Override protected void innerOnFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e); release(); } diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java index 3267548e914..eb4c244c7a9 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java @@ -23,8 +23,8 @@ import org.elasticsearch.Version; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; -public final class TcpTransportChannel implements TransportChannel { - private final TcpTransport transport; +public final class TcpTransportChannel implements TransportChannel { + private final TcpTransport transport; private final Version version; private final String action; private final long requestId; @@ -32,9 +32,9 @@ public final class TcpTransportChannel implements Tr private final long reservedBytes; private final AtomicBoolean released = new AtomicBoolean(); private final String channelType; - private final Channel channel; + private final TcpChannel channel; - TcpTransportChannel(TcpTransport transport, Channel channel, String channelType, String action, + TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, long requestId, Version version, String profileName, long reservedBytes) { this.version = version; this.channel = channel; @@ -97,7 +97,7 @@ public final class TcpTransportChannel implements Tr return version; } - public Channel getChannel() { + public TcpChannel getChannel() { return channel; } } diff --git a/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 19ada600cc1..275c6dbaeb2 100644 --- a/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -39,7 +39,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; @@ -172,57 +171,23 @@ public class TcpTransportTests extends ESTestCase { public void testCompressRequest() throws IOException { final boolean compressed = randomBoolean(); - final AtomicBoolean called = new AtomicBoolean(false); Req request = new Req(randomRealisticUnicodeOfLengthBetween(10, 100)); ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName()); - AtomicReference exceptionReference = new AtomicReference<>(); + AtomicReference messageCaptor = new AtomicReference<>(); try { - TcpTransport transport = new TcpTransport( + TcpTransport transport = new TcpTransport( "test", Settings.builder().put("transport.tcp.compress", compressed).build(), threadPool, new BigArrays(Settings.EMPTY, null), null, null, null) { - @Override - protected InetSocketAddress getLocalAddress(FakeChannel o) { - return null; - } @Override protected FakeChannel bind(String name, InetSocketAddress address) throws IOException { return null; } - @Override - protected void sendMessage(FakeChannel o, BytesReference reference, ActionListener listener) { - try { - StreamInput streamIn = reference.streamInput(); - streamIn.skip(TcpHeader.MARKER_BYTES_SIZE); - int len = streamIn.readInt(); - long requestId = streamIn.readLong(); - assertEquals(42, requestId); - byte status = streamIn.readByte(); - Version version = Version.fromId(streamIn.readInt()); - assertEquals(Version.CURRENT, version); - assertEquals(compressed, TransportStatus.isCompress(status)); - called.compareAndSet(false, true); - if (compressed) { - final int bytesConsumed = TcpHeader.HEADER_SIZE; - streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed)) - .streamInput(streamIn); - } - threadPool.getThreadContext().readHeaders(streamIn); - assertEquals("foobar", streamIn.readString()); - Req readReq = new Req(""); - readReq.readFrom(streamIn); - assertEquals(request.value, readReq.value); - } catch (IOException e) { - exceptionReference.set(e); - } - } - @Override protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, - ActionListener connectListener) throws IOException { - FakeChannel fakeChannel = new FakeChannel(); - return fakeChannel; + ActionListener connectListener) throws IOException { + return new FakeChannel(messageCaptor); } @Override @@ -233,18 +198,41 @@ public class TcpTransportTests extends ESTestCase { @Override public NodeChannels getConnection(DiscoveryNode node) { int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections(); - ArrayList fakeChannels = new ArrayList<>(numConnections); + ArrayList fakeChannels = new ArrayList<>(numConnections); for (int i = 0; i < numConnections; ++i) { - fakeChannels.add(new FakeChannel()); + fakeChannels.add(new FakeChannel(messageCaptor)); } return new NodeChannels(node, fakeChannels, MockTcpTransport.LIGHT_PROFILE, Version.CURRENT); } }; + DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT); Transport.Connection connection = transport.getConnection(node); connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY); - assertTrue(called.get()); - assertNull("IOException while sending message.", exceptionReference.get()); + + BytesReference reference = messageCaptor.get(); + assertNotNull(reference); + + StreamInput streamIn = reference.streamInput(); + streamIn.skip(TcpHeader.MARKER_BYTES_SIZE); + int len = streamIn.readInt(); + long requestId = streamIn.readLong(); + assertEquals(42, requestId); + byte status = streamIn.readByte(); + Version version = Version.fromId(streamIn.readInt()); + assertEquals(Version.CURRENT, version); + assertEquals(compressed, TransportStatus.isCompress(status)); + if (compressed) { + final int bytesConsumed = TcpHeader.HEADER_SIZE; + streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed)) + .streamInput(streamIn); + } + threadPool.getThreadContext().readHeaders(streamIn); + assertEquals("foobar", streamIn.readString()); + Req readReq = new Req(""); + readReq.readFrom(streamIn); + assertEquals(request.value, readReq.value); + } finally { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } @@ -252,6 +240,12 @@ public class TcpTransportTests extends ESTestCase { private static final class FakeChannel implements TcpChannel { + private final AtomicReference messageCaptor; + + FakeChannel(AtomicReference messageCaptor) { + this.messageCaptor = messageCaptor; + } + @Override public void close() { } @@ -268,6 +262,16 @@ public class TcpTransportTests extends ESTestCase { public boolean isOpen() { return false; } + + @Override + public InetSocketAddress getLocalAddress() { + return null; + } + + @Override + public void sendMessage(BytesReference reference, ActionListener listener) { + messageCaptor.set(reference); + } } private static final class Req extends TransportRequest { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 9cdefc292f2..29ff3967d6d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -42,7 +42,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.lease.Releasables; @@ -57,6 +56,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportRequestOptions; @@ -79,7 +79,7 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF * longer. Med is for the typical search / single doc index. And High for things like cluster state. Ping is reserved for * sending out ping requests to other nodes. */ -public class Netty4Transport extends TcpTransport { +public class Netty4Transport extends TcpTransport { static { Netty4Utils.setup(); @@ -249,7 +249,7 @@ public class Netty4Transport extends TcpTransport { } @Override - protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener listener) + protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener listener) throws IOException { ChannelFuture channelFuture = bootstrap.connect(node.getAddress().address()); Channel channel = channelFuture.channel(); @@ -279,28 +279,6 @@ public class Netty4Transport extends TcpTransport { return nettyChannel; } - @Override - protected void sendMessage(NettyTcpChannel channel, BytesReference reference, ActionListener listener) { - final ChannelFuture future = channel.getLowLevelChannel().writeAndFlush(Netty4Utils.toByteBuf(reference)); - future.addListener(f -> { - if (f.isSuccess()) { - listener.onResponse(channel); - } else { - final Throwable cause = f.cause(); - Netty4Utils.maybeDie(cause); - logger.warn((Supplier) () -> - new ParameterizedMessage("write and flush on the network layer failed (channel: {})", channel), cause); - assert cause instanceof Exception; - listener.onFailure((Exception) cause); - } - }); - } - - @Override - protected InetSocketAddress getLocalAddress(NettyTcpChannel channel) { - return (InetSocketAddress) channel.getLowLevelChannel().localAddress(); - } - @Override protected NettyTcpChannel bind(String name, InetSocketAddress address) { Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel(); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java index c18c3c4fe1f..12ab34a32af 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java @@ -20,10 +20,15 @@ package org.elasticsearch.transport.netty4; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpChannel; +import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; public class NettyTcpChannel implements TcpChannel { @@ -48,10 +53,6 @@ public class NettyTcpChannel implements TcpChannel { }); } - public Channel getLowLevelChannel() { - return channel; - } - @Override public void close() { channel.close(); @@ -71,4 +72,28 @@ public class NettyTcpChannel implements TcpChannel { public boolean isOpen() { return channel.isOpen(); } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) channel.localAddress(); + } + + @Override + public void sendMessage(BytesReference reference, ActionListener listener) { + final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference)); + future.addListener(f -> { + if (f.isSuccess()) { + listener.onResponse(this); + } else { + final Throwable cause = f.cause(); + Netty4Utils.maybeDie(cause); + assert cause instanceof Exception; + listener.onFailure((Exception) cause); + } + }); + } + + public Channel getLowLevelChannel() { + return channel; + } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java index 3eb5adc8d06..04a2b8131f9 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; @@ -108,7 +109,8 @@ public class Netty4TransportIT extends ESNetty4IntegTestCase { super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService); } - protected String handleRequest(NettyTcpChannel channel, String profileName, + @Override + protected String handleRequest(TcpChannel channel, String profileName, StreamInput stream, long requestId, int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status) throws IOException { String action = super.handleRequest(channel, profileName, stream, requestId, messageLengthBytes, version, diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java index 295e7ab389c..cde939bab8d 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.TransportService; import org.junit.Before; import java.util.Collections; @@ -59,7 +58,7 @@ public class NettyTransportMultiPortTests extends ESTestCase { .build(); ThreadPool threadPool = new TestThreadPool("tst"); - try (TcpTransport transport = startTransport(settings, threadPool)) { + try (TcpTransport transport = startTransport(settings, threadPool)) { assertEquals(1, transport.profileBoundAddresses().size()); assertEquals(1, transport.boundAddress().boundAddresses().length); } finally { @@ -75,7 +74,7 @@ public class NettyTransportMultiPortTests extends ESTestCase { .build(); ThreadPool threadPool = new TestThreadPool("tst"); - try (TcpTransport transport = startTransport(settings, threadPool)) { + try (TcpTransport transport = startTransport(settings, threadPool)) { assertEquals(1, transport.profileBoundAddresses().size()); assertEquals(1, transport.boundAddress().boundAddresses().length); } finally { @@ -108,7 +107,7 @@ public class NettyTransportMultiPortTests extends ESTestCase { .build(); ThreadPool threadPool = new TestThreadPool("tst"); - try (TcpTransport transport = startTransport(settings, threadPool)) { + try (TcpTransport transport = startTransport(settings, threadPool)) { assertEquals(0, transport.profileBoundAddresses().size()); assertEquals(1, transport.boundAddress().boundAddresses().length); } finally { @@ -116,9 +115,9 @@ public class NettyTransportMultiPortTests extends ESTestCase { } } - private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { + private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()); - TcpTransport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()), + TcpTransport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()), bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); transport.start(); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 47259a7c613..b2126b1b611 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -58,7 +58,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override - protected Version executeHandshake(DiscoveryNode node, NettyTcpChannel channel, TimeValue timeout) throws IOException, + protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException, InterruptedException { if (doHandshake) { return super.executeHandshake(node, channel, timeout); @@ -90,7 +90,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { final Netty4Transport t = (Netty4Transport) transport; @SuppressWarnings("unchecked") - final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; + final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 2cd4ef94ae0..a45411324b0 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -1976,7 +1976,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { MockTcpTransport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList())) { @Override - protected String handleRequest(MockChannel mockChannel, String profileName, StreamInput stream, long requestId, + protected String handleRequest(TcpChannel mockChannel, String profileName, StreamInput stream, long requestId, int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status) throws IOException { return super.handleRequest(mockChannel, profileName, stream, requestId, messageLengthBytes, version, remoteAddress, diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 4b1da5c2126..5d5e14b4061 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -68,7 +68,7 @@ import java.util.function.Consumer; * that need real networking. This implementation is a test only implementation that implements * the networking layer in the worst possible way since it blocks and uses a thread per request model. */ -public class MockTcpTransport extends TcpTransport { +public class MockTcpTransport extends TcpTransport { /** * A pre-built light connection profile that shares a single connection across all @@ -109,11 +109,6 @@ public class MockTcpTransport extends TcpTransport this.mockVersion = mockVersion; } - @Override - protected InetSocketAddress getLocalAddress(MockChannel mockChannel) { - return mockChannel.localAddress; - } - @Override protected MockChannel bind(final String name, InetSocketAddress address) throws IOException { MockServerSocket socket = new MockServerSocket(); @@ -176,7 +171,7 @@ public class MockTcpTransport extends TcpTransport } @Override - protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) + protected MockChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) throws IOException { InetSocketAddress address = node.getAddress().address(); final MockSocket socket = new MockSocket(); @@ -222,22 +217,6 @@ public class MockTcpTransport extends TcpTransport socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings)); } - @Override - protected void sendMessage(MockChannel mockChannel, BytesReference reference, ActionListener listener) { - try { - synchronized (mockChannel) { - final Socket socket = mockChannel.activeChannel; - OutputStream outputStream = new BufferedOutputStream(socket.getOutputStream()); - reference.writeTo(outputStream); - outputStream.flush(); - } - listener.onResponse(mockChannel); - } catch (IOException e) { - listener.onFailure(e); - onException(mockChannel, e); - } - } - @Override public long getNumOpenServerConnections() { return 1; @@ -401,6 +380,25 @@ public class MockTcpTransport extends TcpTransport return isOpen.get(); } + @Override + public InetSocketAddress getLocalAddress() { + return localAddress; + } + + @Override + public void sendMessage(BytesReference reference, ActionListener listener) { + try { + synchronized (this) { + OutputStream outputStream = new BufferedOutputStream(activeChannel.getOutputStream()); + reference.writeTo(outputStream); + outputStream.flush(); + } + listener.onResponse(this); + } catch (IOException e) { + listener.onFailure(e); + onException(this, e); + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index fc8d361b82e..27cb73f98f8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -23,7 +23,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Setting; @@ -33,6 +32,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.nio.channel.ChannelFactory; @@ -54,7 +54,7 @@ import static org.elasticsearch.common.settings.Setting.intSetting; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; -public class NioTransport extends TcpTransport { +public class NioTransport extends TcpTransport { public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; @@ -87,11 +87,6 @@ public class NioTransport extends TcpTransport { return openChannels.serverChannelsCount(); } - @Override - protected InetSocketAddress getLocalAddress(NioChannel channel) { - return channel.getLocalAddress(); - } - @Override protected NioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException { ChannelFactory channelFactory = this.profileToChannelFactory.get(name); @@ -100,21 +95,22 @@ public class NioTransport extends TcpTransport { } @Override - protected void sendMessage(NioChannel channel, BytesReference reference, ActionListener listener) { - if (channel instanceof NioSocketChannel) { - NioSocketChannel nioSocketChannel = (NioSocketChannel) channel; - nioSocketChannel.getWriteContext().sendMessage(reference, listener); - } else { - logger.error("cannot send message to channel of this type [{}]", channel.getClass()); - } - } - - @Override - protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) + protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) throws IOException { NioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get()); openChannels.clientChannelOpened(channel); - channel.addConnectListener(connectListener); + // TODO: Temporary conversion due to types + channel.addConnectListener(new ActionListener() { + @Override + public void onResponse(NioChannel nioChannel) { + connectListener.onResponse(nioChannel); + } + + @Override + public void onFailure(Exception e) { + connectListener.onFailure(e); + } + }); return channel; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java index fab6fa22c6b..5b365fbd36c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java @@ -19,6 +19,9 @@ package org.elasticsearch.transport.nio.channel; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.nio.AcceptingSelector; import java.io.IOException; @@ -39,6 +42,11 @@ public class NioServerSocketChannel extends AbstractNioChannel listener) { + throw new UnsupportedOperationException("Cannot send a message to a server channel."); + } + @Override public String toString() { return "NioServerSocketChannel{" + diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java index 5e4e3230941..520cefd27f9 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java @@ -20,6 +20,8 @@ package org.elasticsearch.transport.nio.channel; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.nio.NetworkBytesReference; import org.elasticsearch.transport.nio.SocketSelector; @@ -46,6 +48,22 @@ public class NioSocketChannel extends AbstractNioChannel { this.socketSelector = selector; } + @Override + public void sendMessage(BytesReference reference, ActionListener listener) { + // TODO: Temporary conversion due to types + writeContext.sendMessage(reference, new ActionListener() { + @Override + public void onResponse(NioChannel nioChannel) { + listener.onResponse(nioChannel); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + @Override public void closeFromSelector() throws IOException { assert socketSelector.isOnCurrentThread() : "Should only call from selector thread"; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index bbe3c13442c..6844b55cadc 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -40,7 +40,7 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase { Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version) { @Override - protected Version executeHandshake(DiscoveryNode node, MockChannel mockChannel, TimeValue timeout) throws IOException, + protected Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, TimeValue timeout) throws IOException, InterruptedException { if (doHandshake) { return super.executeHandshake(node, mockChannel, timeout); @@ -58,8 +58,8 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase { @Override protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { final MockTcpTransport t = (MockTcpTransport) transport; - @SuppressWarnings("unchecked") final TcpTransport.NodeChannels channels = - (TcpTransport.NodeChannels) connection; + @SuppressWarnings("unchecked") final TcpTransport.NodeChannels channels = + (TcpTransport.NodeChannels) connection; TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 04f1b424142..bc02a89a5c1 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.nio.channel.NioChannel; import java.io.IOException; import java.net.InetAddress; @@ -62,7 +61,7 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase { BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override - protected Version executeHandshake(DiscoveryNode node, NioChannel channel, TimeValue timeout) throws IOException, + protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException, InterruptedException { if (doHandshake) { return super.executeHandshake(node, channel, timeout); @@ -100,7 +99,7 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase { @Override protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { @SuppressWarnings("unchecked") - TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; + TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); }