diff --git a/rest-api-spec/test/nodes.info/20_transport.yaml b/rest-api-spec/test/nodes.info/20_transport.yaml new file mode 100644 index 00000000000..a860f88a578 --- /dev/null +++ b/rest-api-spec/test/nodes.info/20_transport.yaml @@ -0,0 +1,9 @@ +--- +"node_info test profile is empty": + - do: + nodes.info: + metric: [ transport ] + +# there is no possbility to use is_true here due to unknown node_id +# which is part of the path, just checking for profiles in the body + - match: { $body: /profiles/ } diff --git a/src/main/java/org/elasticsearch/transport/Transport.java b/src/main/java/org/elasticsearch/transport/Transport.java index 35eb71d3e98..6264f3c3f36 100644 --- a/src/main/java/org/elasticsearch/transport/Transport.java +++ b/src/main/java/org/elasticsearch/transport/Transport.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import java.io.IOException; +import java.util.Map; /** * @@ -42,6 +43,12 @@ public interface Transport extends LifecycleComponent { */ BoundTransportAddress boundAddress(); + /** + * Further profile bound addresses + * @return Should return null if transport does not support profiles, otherwise a map with name of profile and its bound transport address + */ + Map profileBoundAddresses(); + /** * Returns an address from its string representation. */ @@ -78,5 +85,8 @@ public interface Transport extends LifecycleComponent { */ void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException; + /** + * Returns count of currently open connections + */ long serverOpen(); } diff --git a/src/main/java/org/elasticsearch/transport/TransportInfo.java b/src/main/java/org/elasticsearch/transport/TransportInfo.java index fec3a953d8c..817d8a1332f 100644 --- a/src/main/java/org/elasticsearch/transport/TransportInfo.java +++ b/src/main/java/org/elasticsearch/transport/TransportInfo.java @@ -19,6 +19,8 @@ package org.elasticsearch.transport; +import com.google.common.collect.Maps; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -29,6 +31,7 @@ import org.elasticsearch.common.xcontent.XContentBuilderString; import java.io.IOException; import java.io.Serializable; +import java.util.Map; /** * @@ -36,18 +39,21 @@ import java.io.Serializable; public class TransportInfo implements Streamable, Serializable, ToXContent { private BoundTransportAddress address; + private Map profileAddresses; TransportInfo() { } - public TransportInfo(BoundTransportAddress address) { + public TransportInfo(BoundTransportAddress address, @Nullable Map profileAddresses) { this.address = address; + this.profileAddresses = profileAddresses; } static final class Fields { static final XContentBuilderString TRANSPORT = new XContentBuilderString("transport"); static final XContentBuilderString BOUND_ADDRESS = new XContentBuilderString("bound_address"); static final XContentBuilderString PUBLISH_ADDRESS = new XContentBuilderString("publish_address"); + static final XContentBuilderString PROFILES = new XContentBuilderString("profiles"); } @Override @@ -55,6 +61,16 @@ public class TransportInfo implements Streamable, Serializable, ToXContent { builder.startObject(Fields.TRANSPORT); builder.field(Fields.BOUND_ADDRESS, address.boundAddress().toString()); builder.field(Fields.PUBLISH_ADDRESS, address.publishAddress().toString()); + builder.startObject(Fields.PROFILES); + if (profileAddresses != null && profileAddresses.size() > 0) { + for (Map.Entry entry : profileAddresses.entrySet()) { + builder.startObject(entry.getKey()); + builder.field(Fields.BOUND_ADDRESS, entry.getValue().boundAddress().toString()); + builder.field(Fields.PUBLISH_ADDRESS, entry.getValue().publishAddress().toString()); + builder.endObject(); + } + } + builder.endObject(); builder.endObject(); return builder; } @@ -68,11 +84,31 @@ public class TransportInfo implements Streamable, Serializable, ToXContent { @Override public void readFrom(StreamInput in) throws IOException { address = BoundTransportAddress.readBoundTransportAddress(in); + int size = in.readVInt(); + if (size > 0) { + profileAddresses = Maps.newHashMapWithExpectedSize(size); + for (int i = 0; i < size; i++) { + String key = in.readString(); + BoundTransportAddress value = BoundTransportAddress.readBoundTransportAddress(in); + profileAddresses.put(key, value); + } + } } @Override public void writeTo(StreamOutput out) throws IOException { address.writeTo(out); + if (profileAddresses != null) { + out.writeVInt(profileAddresses.size()); + } else { + out.writeVInt(0); + } + if (profileAddresses != null && profileAddresses.size() > 0) { + for (Map.Entry entry : profileAddresses.entrySet()) { + out.writeString(entry.getKey()); + entry.getValue().writeTo(out); + } + } } public BoundTransportAddress address() { @@ -82,4 +118,12 @@ public class TransportInfo implements Streamable, Serializable, ToXContent { public BoundTransportAddress getAddress() { return address(); } + + public Map getProfileAddresses() { + return profileAddresses(); + } + + public Map profileAddresses() { + return profileAddresses; + } } diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index 622636709c6..fa9fa7c164e 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -138,7 +138,7 @@ public class TransportService extends AbstractLifecycleComponent implem return boundAddress; } + @Override + public Map profileBoundAddresses() { + return Collections.EMPTY_MAP; + } + @Override public boolean nodeConnected(DiscoveryNode node) { return connectedNodes.containsKey(node); diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index e415de5118e..d3732d28d15 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -48,12 +48,14 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { protected final ThreadPool threadPool; protected final TransportServiceAdapter transportServiceAdapter; protected final NettyTransport transport; + protected final String profileName; - public MessageChannelHandler(NettyTransport transport, ESLogger logger) { + public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName) { this.threadPool = transport.threadPool(); this.transportServiceAdapter = transport.transportServiceAdapter(); this.transport = transport; this.logger = logger; + this.profileName = profileName; } @Override @@ -203,7 +205,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { final String action = buffer.readString(); - final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); + final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version, profileName); try { final TransportRequestHandler handler = transportServiceAdapter.handler(action); if (handler == null) { diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index a46233cc338..155b716fe33 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -20,9 +20,9 @@ package org.elasticsearch.transport.netty; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - import org.elasticsearch.*; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; @@ -109,72 +109,40 @@ public class NettyTransport extends AbstractLifecycleComponent implem public static final String CONNECTIONS_PER_NODE_REG = "transport.connections_per_node.reg"; public static final String CONNECTIONS_PER_NODE_STATE = "transport.connections_per_node.state"; public static final String CONNECTIONS_PER_NODE_PING = "transport.connections_per_node.ping"; - private static final String DEFAULT_PORT_RANGE = "9300-9400"; + public static final String DEFAULT_PORT_RANGE = "9300-9400"; + public static final String DEFAULT_PROFILE = "default"; - private final NetworkService networkService; - final Version version; + protected final NetworkService networkService; + protected final Version version; - private final boolean blockingClient; - private final TimeValue connectTimeout; - private final ByteSizeValue maxCumulationBufferCapacity; - private final int maxCompositeBufferComponents; - final boolean compress; - private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; - private final int workerCount; - private final ByteSizeValue receivePredictorMin; - private final ByteSizeValue receivePredictorMax; + protected final boolean blockingClient; + protected final TimeValue connectTimeout; + protected final ByteSizeValue maxCumulationBufferCapacity; + protected final int maxCompositeBufferComponents; + protected final boolean compress; + protected final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; + protected final int workerCount; + protected final ByteSizeValue receivePredictorMin; + protected final ByteSizeValue receivePredictorMax; - final int connectionsPerNodeRecovery; - final int connectionsPerNodeBulk; - final int connectionsPerNodeReg; - final int connectionsPerNodeState; - final int connectionsPerNodePing; + protected final int connectionsPerNodeRecovery; + protected final int connectionsPerNodeBulk; + protected final int connectionsPerNodeReg; + protected final int connectionsPerNodeState; + protected final int connectionsPerNodePing; - /* - final int workerCount; - final int bossCount; - - final boolean blockingServer; - - final String port; - - final String bindHost; - - final String publishHost; - - final int publishPort; - - final boolean compress; - - final TimeValue connectTimeout; - final String tcpNoDelay; - final String tcpKeepAlive; - final Boolean reuseAddress; - - final ByteSizeValue tcpSendBufferSize; - final ByteSizeValue tcpReceiveBufferSize; - - final int connectionsPerNodeRecovery; - final int connectionsPerNodeBulk; - final int connectionsPerNodeReg; - final int connectionsPerNodeState; - final int connectionsPerNodePing; - - final ByteSizeValue maxCumulationBufferCapacity; - final int maxCompositeBufferComponents; - */ - - final BigArrays bigArrays; - private final ThreadPool threadPool; - private volatile OpenChannelsHandler serverOpenChannels; - private volatile ClientBootstrap clientBootstrap; + protected final BigArrays bigArrays; + protected final ThreadPool threadPool; + protected volatile OpenChannelsHandler serverOpenChannels; + protected volatile ClientBootstrap clientBootstrap; // node id to actual channel - final ConcurrentMap connectedNodes = newConcurrentMap(); - private final Map serverBootstraps = newConcurrentMap(); - private final Map serverChannels = newConcurrentMap(); - private volatile TransportServiceAdapter transportServiceAdapter; - private volatile BoundTransportAddress boundAddress; - private final KeyedLock connectionLock = new KeyedLock<>(); + protected final ConcurrentMap connectedNodes = newConcurrentMap(); + protected final Map serverBootstraps = newConcurrentMap(); + protected final Map serverChannels = newConcurrentMap(); + protected final Map profileBoundAddresses = newConcurrentMap(); + protected volatile TransportServiceAdapter transportServiceAdapter; + protected volatile BoundTransportAddress boundAddress; + protected final KeyedLock connectionLock = new KeyedLock<>(); // this lock is here to make sure we close this transport and disconnect all the client nodes // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) @@ -263,20 +231,20 @@ public class NettyTransport extends AbstractLifecycleComponent implem // extract default profile first and create standard bootstrap Map profiles = settings.getGroups("transport.profiles", true); - if (!profiles.containsKey("default")) { + if (!profiles.containsKey(DEFAULT_PROFILE)) { profiles = Maps.newHashMap(profiles); - profiles.put("default", ImmutableSettings.EMPTY); + profiles.put(DEFAULT_PROFILE, ImmutableSettings.EMPTY); } Settings fallbackSettings = createFallbackSettings(); - Settings defaultSettings = profiles.get("default"); + Settings defaultSettings = profiles.get(DEFAULT_PROFILE); // loop through all profiles and strart them app, special handling for default one for (Map.Entry entry : profiles.entrySet()) { Settings profileSettings = entry.getValue(); String name = entry.getKey(); - if ("default".equals(name)) { + if (DEFAULT_PROFILE.equals(name)) { profileSettings = settingsBuilder() .put(profileSettings) .put("port", profileSettings.get("port", componentSettings.get("port", this.settings.get("transport.tcp.port", DEFAULT_PORT_RANGE)))) @@ -300,19 +268,24 @@ public class NettyTransport extends AbstractLifecycleComponent implem bindServerBootstrap(name, mergedSettings); } - InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get("default").getLocalAddress(); - InetSocketAddress publishAddress; - int publishPort = componentSettings.getAsInt("publish_port", settings.getAsInt("transport.publish_port", 0)); - if (0 == publishPort) { - publishPort = boundAddress.getPort(); - } + InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get(DEFAULT_PROFILE).getLocalAddress(); + int publishPort = componentSettings.getAsInt("publish_port", settings.getAsInt("transport.publish_port", boundAddress.getPort())); + String publishHost = componentSettings.get("publish_host", settings.get("transport.publish_host", settings.get("transport.host"))); + InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort); + this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)); + } + + @Override + public Map profileBoundAddresses() { + return ImmutableMap.copyOf(profileBoundAddresses); + } + + private InetSocketAddress createPublishAddress(String publishHost, int publishPort) { try { - String publishHost = componentSettings.get("publish_host", settings.get("transport.publish_host", settings.get("transport.host"))); - publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), publishPort); + return new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), publishPort); } catch (Exception e) { throw new BindTransportException("Failed to resolve publish address", e); } - this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)); } private ClientBootstrap createClientBootstrap() { @@ -431,6 +404,14 @@ public class NettyTransport extends AbstractLifecycleComponent implem throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get()); } + if (!DEFAULT_PROFILE.equals(name)) { + InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get(name).getLocalAddress(); + int publishPort = settings.getAsInt("publish_port", boundAddress.getPort()); + String publishHost = settings.get("publish_host", boundAddress.getHostString()); + InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort); + profileBoundAddresses.put(name, new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress))); + } + logger.debug("Bound profile [{}] to address [{}]", name, serverChannels.get(name).getLocalAddress()); } @@ -770,7 +751,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem } } - private NodeChannels connectToChannelsLight(DiscoveryNode node) { + protected NodeChannels connectToChannelsLight(DiscoveryNode node) { InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); ChannelFuture connect = clientBootstrap.connect(address); connect.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); @@ -783,7 +764,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem return new NodeChannels(channels, channels, channels, channels, channels); } - private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) { + protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) { ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length]; ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length]; ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length]; @@ -900,7 +881,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem /** * Disconnects from a node, only if the relevant channel is found to be part of the node channels. */ - private boolean disconnectFromNode(DiscoveryNode node, Channel channel, String reason) { + protected boolean disconnectFromNode(DiscoveryNode node, Channel channel, String reason) { // this might be called multiple times from all the node channels, so do a lightweight // check outside of the lock NodeChannels nodeChannels = connectedNodes.get(node); @@ -930,7 +911,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem /** * Disconnects from a node if a channel is found as part of that nodes channels. */ - private void disconnectFromNodeChannel(final Channel channel, final Throwable failure) { + protected void disconnectFromNodeChannel(final Channel channel, final Throwable failure) { threadPool().generic().execute(new Runnable() { @Override @@ -946,7 +927,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem }); } - private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException { + protected Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException { NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels == null) { throw new NodeNotConnectedException(node, "Node not connected"); @@ -980,7 +961,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents); } channelPipeline.addLast("size", sizeHeader); - channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger)); + // using a dot as a prefix means, this cannot come from any settings parsed + channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, ".client")); return channelPipeline; } } @@ -1017,12 +999,12 @@ public class NettyTransport extends AbstractLifecycleComponent implem sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents); } channelPipeline.addLast("size", sizeHeader); - channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger)); + channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, name)); return channelPipeline; } } - private class ChannelCloseListener implements ChannelFutureListener { + protected class ChannelCloseListener implements ChannelFutureListener { private final DiscoveryNode node; diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index e1d0b5b21ae..fbdb4e6fcce 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -48,13 +48,19 @@ public class NettyTransportChannel implements TransportChannel { private final String action; private final Channel channel; private final long requestId; + private final String profileName; - public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version) { + public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version, String profileName) { this.version = version; this.transport = transport; this.action = action; this.channel = channel; this.requestId = requestId; + this.profileName = profileName; + } + + public String getProfileName() { + return profileName; } @Override diff --git a/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index c4db0e68271..ce1679e8322 100644 --- a/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -35,6 +35,8 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.*; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -188,4 +190,9 @@ abstract class FailAndRetryMockTransport imp public void close() throws ElasticsearchException { } + + @Override + public Map profileBoundAddresses() { + return Collections.EMPTY_MAP; + } } diff --git a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java index 37f7fb92d01..fb8f033ee39 100644 --- a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java @@ -39,6 +39,7 @@ import org.elasticsearch.transport.*; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -403,5 +404,10 @@ public class MockTransportService extends TransportService { public void close() throws ElasticsearchException { transport.close(); } + + @Override + public Map profileBoundAddresses() { + return transport.profileBoundAddresses(); + } } } diff --git a/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationTests.java b/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationTests.java index 4a9c1d20461..171ca88800f 100644 --- a/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationTests.java +++ b/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationTests.java @@ -20,10 +20,13 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.junit.annotations.Network; import org.elasticsearch.transport.TransportModule; import org.junit.Test; @@ -32,7 +35,7 @@ import java.util.Locale; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; -import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.*; @ClusterScope(scope = Scope.SUITE, numDataNodes = 1, enableRandomBenchNodes = false, numClientNodes = 0) public class NettyTransportMultiPortIntegrationTests extends ElasticsearchIntegrationTest { @@ -52,6 +55,8 @@ public class NettyTransportMultiPortIntegrationTests extends ElasticsearchIntegr .put(TransportModule.TRANSPORT_TYPE_KEY, NettyTransport.class.getName()) .put("node.mode", "network") .put("transport.profiles.client1.port", randomPortRange) + .put("transport.profiles.client1.publish_host", "10.0.254.253") + .put("transport.profiles.client1.publish_port", "4321") .put("transport.profiles.client1.reuse_address", true) .build(); } @@ -68,4 +73,25 @@ public class NettyTransportMultiPortIntegrationTests extends ElasticsearchIntegr assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN)); } } + + @Test + @Network + public void testThatInfosAreExposed() throws Exception { + NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().clear().setTransport(true).get(); + for (NodeInfo nodeInfo : response.getNodes()) { + assertThat(nodeInfo.getTransport().getProfileAddresses().keySet(), hasSize(1)); + assertThat(nodeInfo.getTransport().getProfileAddresses(), hasKey("client1")); + assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddress(), instanceOf(InetSocketTransportAddress.class)); + + // bound address + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddress(); + assertThat(inetSocketTransportAddress.address().getPort(), is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10)))); + + // publish address + assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(), instanceOf(InetSocketTransportAddress.class)); + InetSocketTransportAddress publishAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(); + assertThat(publishAddress.address().getHostName(), is("10.0.254.253")); + assertThat(publishAddress.address().getPort(), is(4321)); + } + } } diff --git a/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java b/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java index f6ae551b195..db29cb7544f 100644 --- a/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java +++ b/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java @@ -60,7 +60,6 @@ public class NettyTransportMultiPortTests extends ElasticsearchTestCase { if (threadPool != null) { threadPool.shutdownNow(); } - } @Test diff --git a/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java b/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java index 7e7f9b68883..93b6a92b496 100644 --- a/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java +++ b/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java @@ -58,6 +58,9 @@ import static org.hamcrest.Matchers.is; @ClusterScope(scope = Scope.TEST, numDataNodes = 1) public class NettyTransportTests extends ElasticsearchIntegrationTest { + // static so we can use it in anonymous classes + private static String channelProfileName = null; + @Override protected Settings nodeSettings(int nodeOrdinal) { return settingsBuilder().put(super.nodeSettings(nodeOrdinal)) @@ -76,6 +79,7 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest { fail("Expected exception, but didnt happen"); } catch (ElasticsearchException e) { assertThat(e.getMessage(), containsString("MY MESSAGE")); + assertThat(channelProfileName, is(NettyTransport.DEFAULT_PROFILE)); } } @@ -102,13 +106,13 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = super.getPipeline(); - pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger) { + pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger, NettyTransport.DEFAULT_PROFILE) { @Override protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { final String action = buffer.readString(); - final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); + final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version, name); try { final TransportRequestHandler handler = transportServiceAdapter.handler(action); if (handler == null) { @@ -134,6 +138,7 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest { logger.warn("Actual Exception", e1); } } + channelProfileName = transportChannel.getProfileName(); return action; }