From 59f8c0951afa72c482500a1265456d2dfd885a97 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Mon, 2 Feb 2015 08:17:54 +0100 Subject: [PATCH] Netty Transport: Add profiles to transport infos Until now, there was no possibility to expose infos about configured transport profiles. This commit adds the ability to expose those information in the TransportInfo class. The channel was well as the netty pipeline handler now also contain the profile they were configured for, as this information cannot be extracted elsewhere. In addition, each profile now can set its own publish host and port, which might be needed in case of portforwarding or using docker. Closes #9134 --- .../test/nodes.info/20_transport.yaml | 9 ++ .../elasticsearch/transport/Transport.java | 10 ++ .../transport/TransportInfo.java | 46 +++++- .../transport/TransportService.java | 2 +- .../transport/local/LocalTransport.java | 22 +-- .../netty/MessageChannelHandler.java | 6 +- .../transport/netty/NettyTransport.java | 148 ++++++++---------- .../netty/NettyTransportChannel.java | 8 +- .../transport/FailAndRetryMockTransport.java | 7 + .../test/transport/MockTransportService.java | 6 + ...ttyTransportMultiPortIntegrationTests.java | 28 +++- .../netty/NettyTransportMultiPortTests.java | 1 - .../transport/netty/NettyTransportTests.java | 9 +- 13 files changed, 195 insertions(+), 107 deletions(-) create mode 100644 rest-api-spec/test/nodes.info/20_transport.yaml diff --git a/rest-api-spec/test/nodes.info/20_transport.yaml b/rest-api-spec/test/nodes.info/20_transport.yaml new file mode 100644 index 00000000000..a860f88a578 --- /dev/null +++ b/rest-api-spec/test/nodes.info/20_transport.yaml @@ -0,0 +1,9 @@ +--- +"node_info test profile is empty": + - do: + nodes.info: + metric: [ transport ] + +# there is no possbility to use is_true here due to unknown node_id +# which is part of the path, just checking for profiles in the body + - match: { $body: /profiles/ } diff --git a/src/main/java/org/elasticsearch/transport/Transport.java b/src/main/java/org/elasticsearch/transport/Transport.java index 35eb71d3e98..6264f3c3f36 100644 --- a/src/main/java/org/elasticsearch/transport/Transport.java +++ b/src/main/java/org/elasticsearch/transport/Transport.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import java.io.IOException; +import java.util.Map; /** * @@ -42,6 +43,12 @@ public interface Transport extends LifecycleComponent { */ BoundTransportAddress boundAddress(); + /** + * Further profile bound addresses + * @return Should return null if transport does not support profiles, otherwise a map with name of profile and its bound transport address + */ + Map profileBoundAddresses(); + /** * Returns an address from its string representation. */ @@ -78,5 +85,8 @@ public interface Transport extends LifecycleComponent { */ void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException; + /** + * Returns count of currently open connections + */ long serverOpen(); } diff --git a/src/main/java/org/elasticsearch/transport/TransportInfo.java b/src/main/java/org/elasticsearch/transport/TransportInfo.java index fec3a953d8c..817d8a1332f 100644 --- a/src/main/java/org/elasticsearch/transport/TransportInfo.java +++ b/src/main/java/org/elasticsearch/transport/TransportInfo.java @@ -19,6 +19,8 @@ package org.elasticsearch.transport; +import com.google.common.collect.Maps; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -29,6 +31,7 @@ import org.elasticsearch.common.xcontent.XContentBuilderString; import java.io.IOException; import java.io.Serializable; +import java.util.Map; /** * @@ -36,18 +39,21 @@ import java.io.Serializable; public class TransportInfo implements Streamable, Serializable, ToXContent { private BoundTransportAddress address; + private Map profileAddresses; TransportInfo() { } - public TransportInfo(BoundTransportAddress address) { + public TransportInfo(BoundTransportAddress address, @Nullable Map profileAddresses) { this.address = address; + this.profileAddresses = profileAddresses; } static final class Fields { static final XContentBuilderString TRANSPORT = new XContentBuilderString("transport"); static final XContentBuilderString BOUND_ADDRESS = new XContentBuilderString("bound_address"); static final XContentBuilderString PUBLISH_ADDRESS = new XContentBuilderString("publish_address"); + static final XContentBuilderString PROFILES = new XContentBuilderString("profiles"); } @Override @@ -55,6 +61,16 @@ public class TransportInfo implements Streamable, Serializable, ToXContent { builder.startObject(Fields.TRANSPORT); builder.field(Fields.BOUND_ADDRESS, address.boundAddress().toString()); builder.field(Fields.PUBLISH_ADDRESS, address.publishAddress().toString()); + builder.startObject(Fields.PROFILES); + if (profileAddresses != null && profileAddresses.size() > 0) { + for (Map.Entry entry : profileAddresses.entrySet()) { + builder.startObject(entry.getKey()); + builder.field(Fields.BOUND_ADDRESS, entry.getValue().boundAddress().toString()); + builder.field(Fields.PUBLISH_ADDRESS, entry.getValue().publishAddress().toString()); + builder.endObject(); + } + } + builder.endObject(); builder.endObject(); return builder; } @@ -68,11 +84,31 @@ public class TransportInfo implements Streamable, Serializable, ToXContent { @Override public void readFrom(StreamInput in) throws IOException { address = BoundTransportAddress.readBoundTransportAddress(in); + int size = in.readVInt(); + if (size > 0) { + profileAddresses = Maps.newHashMapWithExpectedSize(size); + for (int i = 0; i < size; i++) { + String key = in.readString(); + BoundTransportAddress value = BoundTransportAddress.readBoundTransportAddress(in); + profileAddresses.put(key, value); + } + } } @Override public void writeTo(StreamOutput out) throws IOException { address.writeTo(out); + if (profileAddresses != null) { + out.writeVInt(profileAddresses.size()); + } else { + out.writeVInt(0); + } + if (profileAddresses != null && profileAddresses.size() > 0) { + for (Map.Entry entry : profileAddresses.entrySet()) { + out.writeString(entry.getKey()); + entry.getValue().writeTo(out); + } + } } public BoundTransportAddress address() { @@ -82,4 +118,12 @@ public class TransportInfo implements Streamable, Serializable, ToXContent { public BoundTransportAddress getAddress() { return address(); } + + public Map getProfileAddresses() { + return profileAddresses(); + } + + public Map profileAddresses() { + return profileAddresses; + } } diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index 622636709c6..fa9fa7c164e 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -138,7 +138,7 @@ public class TransportService extends AbstractLifecycleComponent implem return boundAddress; } + @Override + public Map profileBoundAddresses() { + return Collections.EMPTY_MAP; + } + @Override public boolean nodeConnected(DiscoveryNode node) { return connectedNodes.containsKey(node); diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index e415de5118e..d3732d28d15 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -48,12 +48,14 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { protected final ThreadPool threadPool; protected final TransportServiceAdapter transportServiceAdapter; protected final NettyTransport transport; + protected final String profileName; - public MessageChannelHandler(NettyTransport transport, ESLogger logger) { + public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName) { this.threadPool = transport.threadPool(); this.transportServiceAdapter = transport.transportServiceAdapter(); this.transport = transport; this.logger = logger; + this.profileName = profileName; } @Override @@ -203,7 +205,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { final String action = buffer.readString(); - final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); + final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version, profileName); try { final TransportRequestHandler handler = transportServiceAdapter.handler(action); if (handler == null) { diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index a46233cc338..155b716fe33 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -20,9 +20,9 @@ package org.elasticsearch.transport.netty; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - import org.elasticsearch.*; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; @@ -109,72 +109,40 @@ public class NettyTransport extends AbstractLifecycleComponent implem public static final String CONNECTIONS_PER_NODE_REG = "transport.connections_per_node.reg"; public static final String CONNECTIONS_PER_NODE_STATE = "transport.connections_per_node.state"; public static final String CONNECTIONS_PER_NODE_PING = "transport.connections_per_node.ping"; - private static final String DEFAULT_PORT_RANGE = "9300-9400"; + public static final String DEFAULT_PORT_RANGE = "9300-9400"; + public static final String DEFAULT_PROFILE = "default"; - private final NetworkService networkService; - final Version version; + protected final NetworkService networkService; + protected final Version version; - private final boolean blockingClient; - private final TimeValue connectTimeout; - private final ByteSizeValue maxCumulationBufferCapacity; - private final int maxCompositeBufferComponents; - final boolean compress; - private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; - private final int workerCount; - private final ByteSizeValue receivePredictorMin; - private final ByteSizeValue receivePredictorMax; + protected final boolean blockingClient; + protected final TimeValue connectTimeout; + protected final ByteSizeValue maxCumulationBufferCapacity; + protected final int maxCompositeBufferComponents; + protected final boolean compress; + protected final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; + protected final int workerCount; + protected final ByteSizeValue receivePredictorMin; + protected final ByteSizeValue receivePredictorMax; - final int connectionsPerNodeRecovery; - final int connectionsPerNodeBulk; - final int connectionsPerNodeReg; - final int connectionsPerNodeState; - final int connectionsPerNodePing; + protected final int connectionsPerNodeRecovery; + protected final int connectionsPerNodeBulk; + protected final int connectionsPerNodeReg; + protected final int connectionsPerNodeState; + protected final int connectionsPerNodePing; - /* - final int workerCount; - final int bossCount; - - final boolean blockingServer; - - final String port; - - final String bindHost; - - final String publishHost; - - final int publishPort; - - final boolean compress; - - final TimeValue connectTimeout; - final String tcpNoDelay; - final String tcpKeepAlive; - final Boolean reuseAddress; - - final ByteSizeValue tcpSendBufferSize; - final ByteSizeValue tcpReceiveBufferSize; - - final int connectionsPerNodeRecovery; - final int connectionsPerNodeBulk; - final int connectionsPerNodeReg; - final int connectionsPerNodeState; - final int connectionsPerNodePing; - - final ByteSizeValue maxCumulationBufferCapacity; - final int maxCompositeBufferComponents; - */ - - final BigArrays bigArrays; - private final ThreadPool threadPool; - private volatile OpenChannelsHandler serverOpenChannels; - private volatile ClientBootstrap clientBootstrap; + protected final BigArrays bigArrays; + protected final ThreadPool threadPool; + protected volatile OpenChannelsHandler serverOpenChannels; + protected volatile ClientBootstrap clientBootstrap; // node id to actual channel - final ConcurrentMap connectedNodes = newConcurrentMap(); - private final Map serverBootstraps = newConcurrentMap(); - private final Map serverChannels = newConcurrentMap(); - private volatile TransportServiceAdapter transportServiceAdapter; - private volatile BoundTransportAddress boundAddress; - private final KeyedLock connectionLock = new KeyedLock<>(); + protected final ConcurrentMap connectedNodes = newConcurrentMap(); + protected final Map serverBootstraps = newConcurrentMap(); + protected final Map serverChannels = newConcurrentMap(); + protected final Map profileBoundAddresses = newConcurrentMap(); + protected volatile TransportServiceAdapter transportServiceAdapter; + protected volatile BoundTransportAddress boundAddress; + protected final KeyedLock connectionLock = new KeyedLock<>(); // this lock is here to make sure we close this transport and disconnect all the client nodes // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) @@ -263,20 +231,20 @@ public class NettyTransport extends AbstractLifecycleComponent implem // extract default profile first and create standard bootstrap Map profiles = settings.getGroups("transport.profiles", true); - if (!profiles.containsKey("default")) { + if (!profiles.containsKey(DEFAULT_PROFILE)) { profiles = Maps.newHashMap(profiles); - profiles.put("default", ImmutableSettings.EMPTY); + profiles.put(DEFAULT_PROFILE, ImmutableSettings.EMPTY); } Settings fallbackSettings = createFallbackSettings(); - Settings defaultSettings = profiles.get("default"); + Settings defaultSettings = profiles.get(DEFAULT_PROFILE); // loop through all profiles and strart them app, special handling for default one for (Map.Entry entry : profiles.entrySet()) { Settings profileSettings = entry.getValue(); String name = entry.getKey(); - if ("default".equals(name)) { + if (DEFAULT_PROFILE.equals(name)) { profileSettings = settingsBuilder() .put(profileSettings) .put("port", profileSettings.get("port", componentSettings.get("port", this.settings.get("transport.tcp.port", DEFAULT_PORT_RANGE)))) @@ -300,19 +268,24 @@ public class NettyTransport extends AbstractLifecycleComponent implem bindServerBootstrap(name, mergedSettings); } - InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get("default").getLocalAddress(); - InetSocketAddress publishAddress; - int publishPort = componentSettings.getAsInt("publish_port", settings.getAsInt("transport.publish_port", 0)); - if (0 == publishPort) { - publishPort = boundAddress.getPort(); - } + InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get(DEFAULT_PROFILE).getLocalAddress(); + int publishPort = componentSettings.getAsInt("publish_port", settings.getAsInt("transport.publish_port", boundAddress.getPort())); + String publishHost = componentSettings.get("publish_host", settings.get("transport.publish_host", settings.get("transport.host"))); + InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort); + this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)); + } + + @Override + public Map profileBoundAddresses() { + return ImmutableMap.copyOf(profileBoundAddresses); + } + + private InetSocketAddress createPublishAddress(String publishHost, int publishPort) { try { - String publishHost = componentSettings.get("publish_host", settings.get("transport.publish_host", settings.get("transport.host"))); - publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), publishPort); + return new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), publishPort); } catch (Exception e) { throw new BindTransportException("Failed to resolve publish address", e); } - this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)); } private ClientBootstrap createClientBootstrap() { @@ -431,6 +404,14 @@ public class NettyTransport extends AbstractLifecycleComponent implem throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get()); } + if (!DEFAULT_PROFILE.equals(name)) { + InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get(name).getLocalAddress(); + int publishPort = settings.getAsInt("publish_port", boundAddress.getPort()); + String publishHost = settings.get("publish_host", boundAddress.getHostString()); + InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort); + profileBoundAddresses.put(name, new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress))); + } + logger.debug("Bound profile [{}] to address [{}]", name, serverChannels.get(name).getLocalAddress()); } @@ -770,7 +751,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem } } - private NodeChannels connectToChannelsLight(DiscoveryNode node) { + protected NodeChannels connectToChannelsLight(DiscoveryNode node) { InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); ChannelFuture connect = clientBootstrap.connect(address); connect.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); @@ -783,7 +764,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem return new NodeChannels(channels, channels, channels, channels, channels); } - private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) { + protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) { ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length]; ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length]; ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length]; @@ -900,7 +881,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem /** * Disconnects from a node, only if the relevant channel is found to be part of the node channels. */ - private boolean disconnectFromNode(DiscoveryNode node, Channel channel, String reason) { + protected boolean disconnectFromNode(DiscoveryNode node, Channel channel, String reason) { // this might be called multiple times from all the node channels, so do a lightweight // check outside of the lock NodeChannels nodeChannels = connectedNodes.get(node); @@ -930,7 +911,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem /** * Disconnects from a node if a channel is found as part of that nodes channels. */ - private void disconnectFromNodeChannel(final Channel channel, final Throwable failure) { + protected void disconnectFromNodeChannel(final Channel channel, final Throwable failure) { threadPool().generic().execute(new Runnable() { @Override @@ -946,7 +927,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem }); } - private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException { + protected Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException { NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels == null) { throw new NodeNotConnectedException(node, "Node not connected"); @@ -980,7 +961,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents); } channelPipeline.addLast("size", sizeHeader); - channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger)); + // using a dot as a prefix means, this cannot come from any settings parsed + channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, ".client")); return channelPipeline; } } @@ -1017,12 +999,12 @@ public class NettyTransport extends AbstractLifecycleComponent implem sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents); } channelPipeline.addLast("size", sizeHeader); - channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger)); + channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, name)); return channelPipeline; } } - private class ChannelCloseListener implements ChannelFutureListener { + protected class ChannelCloseListener implements ChannelFutureListener { private final DiscoveryNode node; diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index e1d0b5b21ae..fbdb4e6fcce 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -48,13 +48,19 @@ public class NettyTransportChannel implements TransportChannel { private final String action; private final Channel channel; private final long requestId; + private final String profileName; - public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version) { + public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version, String profileName) { this.version = version; this.transport = transport; this.action = action; this.channel = channel; this.requestId = requestId; + this.profileName = profileName; + } + + public String getProfileName() { + return profileName; } @Override diff --git a/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index c4db0e68271..ce1679e8322 100644 --- a/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -35,6 +35,8 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.*; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -188,4 +190,9 @@ abstract class FailAndRetryMockTransport imp public void close() throws ElasticsearchException { } + + @Override + public Map profileBoundAddresses() { + return Collections.EMPTY_MAP; + } } diff --git a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java index 37f7fb92d01..fb8f033ee39 100644 --- a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java @@ -39,6 +39,7 @@ import org.elasticsearch.transport.*; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -403,5 +404,10 @@ public class MockTransportService extends TransportService { public void close() throws ElasticsearchException { transport.close(); } + + @Override + public Map profileBoundAddresses() { + return transport.profileBoundAddresses(); + } } } diff --git a/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationTests.java b/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationTests.java index 4a9c1d20461..171ca88800f 100644 --- a/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationTests.java +++ b/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationTests.java @@ -20,10 +20,13 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.junit.annotations.Network; import org.elasticsearch.transport.TransportModule; import org.junit.Test; @@ -32,7 +35,7 @@ import java.util.Locale; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; -import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.*; @ClusterScope(scope = Scope.SUITE, numDataNodes = 1, enableRandomBenchNodes = false, numClientNodes = 0) public class NettyTransportMultiPortIntegrationTests extends ElasticsearchIntegrationTest { @@ -52,6 +55,8 @@ public class NettyTransportMultiPortIntegrationTests extends ElasticsearchIntegr .put(TransportModule.TRANSPORT_TYPE_KEY, NettyTransport.class.getName()) .put("node.mode", "network") .put("transport.profiles.client1.port", randomPortRange) + .put("transport.profiles.client1.publish_host", "10.0.254.253") + .put("transport.profiles.client1.publish_port", "4321") .put("transport.profiles.client1.reuse_address", true) .build(); } @@ -68,4 +73,25 @@ public class NettyTransportMultiPortIntegrationTests extends ElasticsearchIntegr assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN)); } } + + @Test + @Network + public void testThatInfosAreExposed() throws Exception { + NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().clear().setTransport(true).get(); + for (NodeInfo nodeInfo : response.getNodes()) { + assertThat(nodeInfo.getTransport().getProfileAddresses().keySet(), hasSize(1)); + assertThat(nodeInfo.getTransport().getProfileAddresses(), hasKey("client1")); + assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddress(), instanceOf(InetSocketTransportAddress.class)); + + // bound address + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddress(); + assertThat(inetSocketTransportAddress.address().getPort(), is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10)))); + + // publish address + assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(), instanceOf(InetSocketTransportAddress.class)); + InetSocketTransportAddress publishAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(); + assertThat(publishAddress.address().getHostName(), is("10.0.254.253")); + assertThat(publishAddress.address().getPort(), is(4321)); + } + } } diff --git a/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java b/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java index f6ae551b195..db29cb7544f 100644 --- a/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java +++ b/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java @@ -60,7 +60,6 @@ public class NettyTransportMultiPortTests extends ElasticsearchTestCase { if (threadPool != null) { threadPool.shutdownNow(); } - } @Test diff --git a/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java b/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java index 7e7f9b68883..93b6a92b496 100644 --- a/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java +++ b/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java @@ -58,6 +58,9 @@ import static org.hamcrest.Matchers.is; @ClusterScope(scope = Scope.TEST, numDataNodes = 1) public class NettyTransportTests extends ElasticsearchIntegrationTest { + // static so we can use it in anonymous classes + private static String channelProfileName = null; + @Override protected Settings nodeSettings(int nodeOrdinal) { return settingsBuilder().put(super.nodeSettings(nodeOrdinal)) @@ -76,6 +79,7 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest { fail("Expected exception, but didnt happen"); } catch (ElasticsearchException e) { assertThat(e.getMessage(), containsString("MY MESSAGE")); + assertThat(channelProfileName, is(NettyTransport.DEFAULT_PROFILE)); } } @@ -102,13 +106,13 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = super.getPipeline(); - pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger) { + pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger, NettyTransport.DEFAULT_PROFILE) { @Override protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { final String action = buffer.readString(); - final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); + final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version, name); try { final TransportRequestHandler handler = transportServiceAdapter.handler(action); if (handler == null) { @@ -134,6 +138,7 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest { logger.warn("Actual Exception", e1); } } + channelProfileName = transportChannel.getProfileName(); return action; }