Netty Transport: Add profiles to transport infos

Until now, there was no possibility to expose infos about configured
transport profiles. This commit adds the ability to expose those
information in the TransportInfo class.

The channel was well as the netty pipeline handler now also contain
the profile they were configured for, as this information cannot be
extracted elsewhere.

In addition, each profile now can set its own publish host and port,
which might be needed in case of portforwarding or using docker.

Closes #9134
This commit is contained in:
Alexander Reelsen 2015-02-02 08:17:54 +01:00
parent 3ce05b6919
commit 59f8c0951a
13 changed files with 195 additions and 107 deletions

View File

@ -0,0 +1,9 @@
---
"node_info test profile is empty":
- do:
nodes.info:
metric: [ transport ]
# there is no possbility to use is_true here due to unknown node_id
# which is part of the path, just checking for profiles in the body
- match: { $body: /profiles/ }

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import java.io.IOException;
import java.util.Map;
/**
*
@ -42,6 +43,12 @@ public interface Transport extends LifecycleComponent<Transport> {
*/
BoundTransportAddress boundAddress();
/**
* Further profile bound addresses
* @return Should return null if transport does not support profiles, otherwise a map with name of profile and its bound transport address
*/
Map<String, BoundTransportAddress> profileBoundAddresses();
/**
* Returns an address from its string representation.
*/
@ -78,5 +85,8 @@ public interface Transport extends LifecycleComponent<Transport> {
*/
void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException;
/**
* Returns count of currently open connections
*/
long serverOpen();
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.transport;
import com.google.common.collect.Maps;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -29,6 +31,7 @@ import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
/**
*
@ -36,18 +39,21 @@ import java.io.Serializable;
public class TransportInfo implements Streamable, Serializable, ToXContent {
private BoundTransportAddress address;
private Map<String, BoundTransportAddress> profileAddresses;
TransportInfo() {
}
public TransportInfo(BoundTransportAddress address) {
public TransportInfo(BoundTransportAddress address, @Nullable Map<String, BoundTransportAddress> profileAddresses) {
this.address = address;
this.profileAddresses = profileAddresses;
}
static final class Fields {
static final XContentBuilderString TRANSPORT = new XContentBuilderString("transport");
static final XContentBuilderString BOUND_ADDRESS = new XContentBuilderString("bound_address");
static final XContentBuilderString PUBLISH_ADDRESS = new XContentBuilderString("publish_address");
static final XContentBuilderString PROFILES = new XContentBuilderString("profiles");
}
@Override
@ -55,6 +61,16 @@ public class TransportInfo implements Streamable, Serializable, ToXContent {
builder.startObject(Fields.TRANSPORT);
builder.field(Fields.BOUND_ADDRESS, address.boundAddress().toString());
builder.field(Fields.PUBLISH_ADDRESS, address.publishAddress().toString());
builder.startObject(Fields.PROFILES);
if (profileAddresses != null && profileAddresses.size() > 0) {
for (Map.Entry<String, BoundTransportAddress> entry : profileAddresses.entrySet()) {
builder.startObject(entry.getKey());
builder.field(Fields.BOUND_ADDRESS, entry.getValue().boundAddress().toString());
builder.field(Fields.PUBLISH_ADDRESS, entry.getValue().publishAddress().toString());
builder.endObject();
}
}
builder.endObject();
builder.endObject();
return builder;
}
@ -68,11 +84,31 @@ public class TransportInfo implements Streamable, Serializable, ToXContent {
@Override
public void readFrom(StreamInput in) throws IOException {
address = BoundTransportAddress.readBoundTransportAddress(in);
int size = in.readVInt();
if (size > 0) {
profileAddresses = Maps.newHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
String key = in.readString();
BoundTransportAddress value = BoundTransportAddress.readBoundTransportAddress(in);
profileAddresses.put(key, value);
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
address.writeTo(out);
if (profileAddresses != null) {
out.writeVInt(profileAddresses.size());
} else {
out.writeVInt(0);
}
if (profileAddresses != null && profileAddresses.size() > 0) {
for (Map.Entry<String, BoundTransportAddress> entry : profileAddresses.entrySet()) {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
}
}
}
public BoundTransportAddress address() {
@ -82,4 +118,12 @@ public class TransportInfo implements Streamable, Serializable, ToXContent {
public BoundTransportAddress getAddress() {
return address();
}
public Map<String, BoundTransportAddress> getProfileAddresses() {
return profileAddresses();
}
public Map<String, BoundTransportAddress> profileAddresses() {
return profileAddresses;
}
}

View File

@ -138,7 +138,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
if (boundTransportAddress == null) {
return null;
}
return new TransportInfo(boundTransportAddress);
return new TransportInfo(boundTransportAddress, transport.profileBoundAddresses());
}
public TransportStats stats() {

View File

@ -37,24 +37,11 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportSerializationException;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
@ -150,6 +137,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
return boundAddress;
}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return Collections.EMPTY_MAP;
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return connectedNodes.containsKey(node);

View File

@ -48,12 +48,14 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
protected final ThreadPool threadPool;
protected final TransportServiceAdapter transportServiceAdapter;
protected final NettyTransport transport;
protected final String profileName;
public MessageChannelHandler(NettyTransport transport, ESLogger logger) {
public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName) {
this.threadPool = transport.threadPool();
this.transportServiceAdapter = transport.transportServiceAdapter();
this.transport = transport;
this.logger = logger;
this.profileName = profileName;
}
@Override
@ -203,7 +205,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version, profileName);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {

View File

@ -20,9 +20,9 @@
package org.elasticsearch.transport.netty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
@ -109,72 +109,40 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
public static final String CONNECTIONS_PER_NODE_REG = "transport.connections_per_node.reg";
public static final String CONNECTIONS_PER_NODE_STATE = "transport.connections_per_node.state";
public static final String CONNECTIONS_PER_NODE_PING = "transport.connections_per_node.ping";
private static final String DEFAULT_PORT_RANGE = "9300-9400";
public static final String DEFAULT_PORT_RANGE = "9300-9400";
public static final String DEFAULT_PROFILE = "default";
private final NetworkService networkService;
final Version version;
protected final NetworkService networkService;
protected final Version version;
private final boolean blockingClient;
private final TimeValue connectTimeout;
private final ByteSizeValue maxCumulationBufferCapacity;
private final int maxCompositeBufferComponents;
final boolean compress;
private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
private final int workerCount;
private final ByteSizeValue receivePredictorMin;
private final ByteSizeValue receivePredictorMax;
protected final boolean blockingClient;
protected final TimeValue connectTimeout;
protected final ByteSizeValue maxCumulationBufferCapacity;
protected final int maxCompositeBufferComponents;
protected final boolean compress;
protected final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
protected final int workerCount;
protected final ByteSizeValue receivePredictorMin;
protected final ByteSizeValue receivePredictorMax;
final int connectionsPerNodeRecovery;
final int connectionsPerNodeBulk;
final int connectionsPerNodeReg;
final int connectionsPerNodeState;
final int connectionsPerNodePing;
protected final int connectionsPerNodeRecovery;
protected final int connectionsPerNodeBulk;
protected final int connectionsPerNodeReg;
protected final int connectionsPerNodeState;
protected final int connectionsPerNodePing;
/*
final int workerCount;
final int bossCount;
final boolean blockingServer;
final String port;
final String bindHost;
final String publishHost;
final int publishPort;
final boolean compress;
final TimeValue connectTimeout;
final String tcpNoDelay;
final String tcpKeepAlive;
final Boolean reuseAddress;
final ByteSizeValue tcpSendBufferSize;
final ByteSizeValue tcpReceiveBufferSize;
final int connectionsPerNodeRecovery;
final int connectionsPerNodeBulk;
final int connectionsPerNodeReg;
final int connectionsPerNodeState;
final int connectionsPerNodePing;
final ByteSizeValue maxCumulationBufferCapacity;
final int maxCompositeBufferComponents;
*/
final BigArrays bigArrays;
private final ThreadPool threadPool;
private volatile OpenChannelsHandler serverOpenChannels;
private volatile ClientBootstrap clientBootstrap;
protected final BigArrays bigArrays;
protected final ThreadPool threadPool;
protected volatile OpenChannelsHandler serverOpenChannels;
protected volatile ClientBootstrap clientBootstrap;
// node id to actual channel
final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();
private final Map<String, Channel> serverChannels = newConcurrentMap();
private volatile TransportServiceAdapter transportServiceAdapter;
private volatile BoundTransportAddress boundAddress;
private final KeyedLock<String> connectionLock = new KeyedLock<>();
protected final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
protected final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();
protected final Map<String, Channel> serverChannels = newConcurrentMap();
protected final Map<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
protected volatile TransportServiceAdapter transportServiceAdapter;
protected volatile BoundTransportAddress boundAddress;
protected final KeyedLock<String> connectionLock = new KeyedLock<>();
// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
@ -263,20 +231,20 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// extract default profile first and create standard bootstrap
Map<String, Settings> profiles = settings.getGroups("transport.profiles", true);
if (!profiles.containsKey("default")) {
if (!profiles.containsKey(DEFAULT_PROFILE)) {
profiles = Maps.newHashMap(profiles);
profiles.put("default", ImmutableSettings.EMPTY);
profiles.put(DEFAULT_PROFILE, ImmutableSettings.EMPTY);
}
Settings fallbackSettings = createFallbackSettings();
Settings defaultSettings = profiles.get("default");
Settings defaultSettings = profiles.get(DEFAULT_PROFILE);
// loop through all profiles and strart them app, special handling for default one
for (Map.Entry<String, Settings> entry : profiles.entrySet()) {
Settings profileSettings = entry.getValue();
String name = entry.getKey();
if ("default".equals(name)) {
if (DEFAULT_PROFILE.equals(name)) {
profileSettings = settingsBuilder()
.put(profileSettings)
.put("port", profileSettings.get("port", componentSettings.get("port", this.settings.get("transport.tcp.port", DEFAULT_PORT_RANGE))))
@ -300,19 +268,24 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
bindServerBootstrap(name, mergedSettings);
}
InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get("default").getLocalAddress();
InetSocketAddress publishAddress;
int publishPort = componentSettings.getAsInt("publish_port", settings.getAsInt("transport.publish_port", 0));
if (0 == publishPort) {
publishPort = boundAddress.getPort();
}
try {
InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get(DEFAULT_PROFILE).getLocalAddress();
int publishPort = componentSettings.getAsInt("publish_port", settings.getAsInt("transport.publish_port", boundAddress.getPort()));
String publishHost = componentSettings.get("publish_host", settings.get("transport.publish_host", settings.get("transport.host")));
publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), publishPort);
InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort);
this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return ImmutableMap.copyOf(profileBoundAddresses);
}
private InetSocketAddress createPublishAddress(String publishHost, int publishPort) {
try {
return new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), publishPort);
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}
this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
}
private ClientBootstrap createClientBootstrap() {
@ -431,6 +404,14 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
}
if (!DEFAULT_PROFILE.equals(name)) {
InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get(name).getLocalAddress();
int publishPort = settings.getAsInt("publish_port", boundAddress.getPort());
String publishHost = settings.get("publish_host", boundAddress.getHostString());
InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort);
profileBoundAddresses.put(name, new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)));
}
logger.debug("Bound profile [{}] to address [{}]", name, serverChannels.get(name).getLocalAddress());
}
@ -770,7 +751,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
}
private NodeChannels connectToChannelsLight(DiscoveryNode node) {
protected NodeChannels connectToChannelsLight(DiscoveryNode node) {
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture connect = clientBootstrap.connect(address);
connect.awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
@ -783,7 +764,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return new NodeChannels(channels, channels, channels, channels, channels);
}
private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];
ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];
ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];
@ -900,7 +881,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
/**
* Disconnects from a node, only if the relevant channel is found to be part of the node channels.
*/
private boolean disconnectFromNode(DiscoveryNode node, Channel channel, String reason) {
protected boolean disconnectFromNode(DiscoveryNode node, Channel channel, String reason) {
// this might be called multiple times from all the node channels, so do a lightweight
// check outside of the lock
NodeChannels nodeChannels = connectedNodes.get(node);
@ -930,7 +911,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
/**
* Disconnects from a node if a channel is found as part of that nodes channels.
*/
private void disconnectFromNodeChannel(final Channel channel, final Throwable failure) {
protected void disconnectFromNodeChannel(final Channel channel, final Throwable failure) {
threadPool().generic().execute(new Runnable() {
@Override
@ -946,7 +927,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
});
}
private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
protected Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels == null) {
throw new NodeNotConnectedException(node, "Node not connected");
@ -980,7 +961,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents);
}
channelPipeline.addLast("size", sizeHeader);
channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger));
// using a dot as a prefix means, this cannot come from any settings parsed
channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, ".client"));
return channelPipeline;
}
}
@ -1017,12 +999,12 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents);
}
channelPipeline.addLast("size", sizeHeader);
channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger));
channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, name));
return channelPipeline;
}
}
private class ChannelCloseListener implements ChannelFutureListener {
protected class ChannelCloseListener implements ChannelFutureListener {
private final DiscoveryNode node;

View File

@ -48,13 +48,19 @@ public class NettyTransportChannel implements TransportChannel {
private final String action;
private final Channel channel;
private final long requestId;
private final String profileName;
public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version) {
public NettyTransportChannel(NettyTransport transport, String action, Channel channel, long requestId, Version version, String profileName) {
this.version = version;
this.transport = transport;
this.action = action;
this.channel = channel;
this.requestId = requestId;
this.profileName = profileName;
}
public String getProfileName() {
return profileName;
}
@Override

View File

@ -35,6 +35,8 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@ -188,4 +190,9 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
public void close() throws ElasticsearchException {
}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return Collections.EMPTY_MAP;
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@ -403,5 +404,10 @@ public class MockTransportService extends TransportService {
public void close() throws ElasticsearchException {
transport.close();
}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return transport.profileBoundAddresses();
}
}
}

View File

@ -20,10 +20,13 @@ package org.elasticsearch.transport.netty;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.Network;
import org.elasticsearch.transport.TransportModule;
import org.junit.Test;
@ -32,7 +35,7 @@ import java.util.Locale;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, enableRandomBenchNodes = false, numClientNodes = 0)
public class NettyTransportMultiPortIntegrationTests extends ElasticsearchIntegrationTest {
@ -52,6 +55,8 @@ public class NettyTransportMultiPortIntegrationTests extends ElasticsearchIntegr
.put(TransportModule.TRANSPORT_TYPE_KEY, NettyTransport.class.getName())
.put("node.mode", "network")
.put("transport.profiles.client1.port", randomPortRange)
.put("transport.profiles.client1.publish_host", "10.0.254.253")
.put("transport.profiles.client1.publish_port", "4321")
.put("transport.profiles.client1.reuse_address", true)
.build();
}
@ -68,4 +73,25 @@ public class NettyTransportMultiPortIntegrationTests extends ElasticsearchIntegr
assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN));
}
}
@Test
@Network
public void testThatInfosAreExposed() throws Exception {
NodesInfoResponse response = client().admin().cluster().prepareNodesInfo().clear().setTransport(true).get();
for (NodeInfo nodeInfo : response.getNodes()) {
assertThat(nodeInfo.getTransport().getProfileAddresses().keySet(), hasSize(1));
assertThat(nodeInfo.getTransport().getProfileAddresses(), hasKey("client1"));
assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddress(), instanceOf(InetSocketTransportAddress.class));
// bound address
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddress();
assertThat(inetSocketTransportAddress.address().getPort(), is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10))));
// publish address
assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(), instanceOf(InetSocketTransportAddress.class));
InetSocketTransportAddress publishAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress();
assertThat(publishAddress.address().getHostName(), is("10.0.254.253"));
assertThat(publishAddress.address().getPort(), is(4321));
}
}
}

View File

@ -60,7 +60,6 @@ public class NettyTransportMultiPortTests extends ElasticsearchTestCase {
if (threadPool != null) {
threadPool.shutdownNow();
}
}
@Test

View File

@ -58,6 +58,9 @@ import static org.hamcrest.Matchers.is;
@ClusterScope(scope = Scope.TEST, numDataNodes = 1)
public class NettyTransportTests extends ElasticsearchIntegrationTest {
// static so we can use it in anonymous classes
private static String channelProfileName = null;
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder().put(super.nodeSettings(nodeOrdinal))
@ -76,6 +79,7 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest {
fail("Expected exception, but didnt happen");
} catch (ElasticsearchException e) {
assertThat(e.getMessage(), containsString("MY MESSAGE"));
assertThat(channelProfileName, is(NettyTransport.DEFAULT_PROFILE));
}
}
@ -102,13 +106,13 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = super.getPipeline();
pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger) {
pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger, NettyTransport.DEFAULT_PROFILE) {
@Override
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version, name);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {
@ -134,6 +138,7 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest {
logger.warn("Actual Exception", e1);
}
}
channelProfileName = transportChannel.getProfileName();
return action;
}