Pass DiscoveryNode to initiateChannel (#32958)

This is related to #32517. This commit passes the DiscoveryNode to the
initiateChannel method for different Transport implementation. This
will allow additional attributes (besides just the socket address) to be
used when opening channels.
This commit is contained in:
Tim Brooks 2018-08-20 08:54:55 -06:00 committed by GitHub
parent eef0e35913
commit faa42de66d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 16 additions and 8 deletions

View File

@ -39,6 +39,7 @@ import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -222,7 +223,8 @@ public class Netty4Transport extends TcpTransport {
static final AttributeKey<Netty4TcpServerChannel> SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel"); static final AttributeKey<Netty4TcpServerChannel> SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel");
@Override @Override
protected Netty4TcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> listener) throws IOException { protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> listener) throws IOException {
InetSocketAddress address = node.getAddress().address();
ChannelFuture channelFuture = bootstrap.connect(address); ChannelFuture channelFuture = bootstrap.connect(address);
Channel channel = channelFuture.channel(); Channel channel = channelFuture.channel();
if (channel == null) { if (channel == null) {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.nio;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.recycler.Recycler;
@ -82,7 +83,8 @@ public class NioTransport extends TcpTransport {
} }
@Override @Override
protected NioTcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException { protected NioTcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
InetSocketAddress address = node.getAddress().address();
NioTcpChannel channel = nioGroup.openChannel(address, clientChannelFactory); NioTcpChannel channel = nioGroup.openChannel(address, clientChannelFactory);
channel.addConnectListener(ActionListener.toBiConsumer(connectListener)); channel.addConnectListener(ActionListener.toBiConsumer(connectListener));
return channel; return channel;

View File

@ -441,7 +441,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
try { try {
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture(); PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
connectionFutures.add(connectFuture); connectionFutures.add(connectFuture);
TcpChannel channel = initiateChannel(node.getAddress().address(), connectFuture); TcpChannel channel = initiateChannel(node, connectFuture);
logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel)); logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel));
channels.add(channel); channels.add(channel);
} catch (Exception e) { } catch (Exception e) {
@ -841,12 +841,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
/** /**
* Initiate a single tcp socket channel. * Initiate a single tcp socket channel.
* *
* @param address address for the initiated connection * @param node for the initiated connection
* @param connectListener listener to be called when connection complete * @param connectListener listener to be called when connection complete
* @return the pending connection * @return the pending connection
* @throws IOException if an I/O exception occurs while opening the channel * @throws IOException if an I/O exception occurs while opening the channel
*/ */
protected abstract TcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException; protected abstract TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException;
/** /**
* Called to tear down internal resources * Called to tear down internal resources

View File

@ -188,7 +188,7 @@ public class TcpTransportTests extends ESTestCase {
} }
@Override @Override
protected FakeChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException { protected FakeChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
return new FakeChannel(messageCaptor); return new FakeChannel(messageCaptor);
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.elasticsearch.cli.SuppressForbidden; import org.elasticsearch.cli.SuppressForbidden;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -162,7 +163,8 @@ public class MockTcpTransport extends TcpTransport {
@Override @Override
@SuppressForbidden(reason = "real socket for mocking remote connections") @SuppressForbidden(reason = "real socket for mocking remote connections")
protected MockChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException { protected MockChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
InetSocketAddress address = node.getAddress().address();
final MockSocket socket = new MockSocket(); final MockSocket socket = new MockSocket();
final MockChannel channel = new MockChannel(socket, address, "none"); final MockChannel channel = new MockChannel(socket, address, "none");

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
@ -85,7 +86,8 @@ public class MockNioTransport extends TcpTransport {
} }
@Override @Override
protected MockSocketChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException { protected MockSocketChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
InetSocketAddress address = node.getAddress().address();
MockSocketChannel channel = nioGroup.openChannel(address, clientChannelFactory); MockSocketChannel channel = nioGroup.openChannel(address, clientChannelFactory);
channel.addConnectListener(ActionListener.toBiConsumer(connectListener)); channel.addConnectListener(ActionListener.toBiConsumer(connectListener));
return channel; return channel;