diff --git a/core/src/main/java/org/elasticsearch/common/transport/BoundTransportAddress.java b/core/src/main/java/org/elasticsearch/common/transport/BoundTransportAddress.java index f3bbeb0cc74..28e4506b953 100644 --- a/core/src/main/java/org/elasticsearch/common/transport/BoundTransportAddress.java +++ b/core/src/main/java/org/elasticsearch/common/transport/BoundTransportAddress.java @@ -26,28 +26,31 @@ import org.elasticsearch.common.io.stream.Streamable; import java.io.IOException; /** - * A bounded transport address is a tuple of two {@link TransportAddress}, one that represents - * the address the transport is bounded on, the the published one represents the one clients should - * communicate on. + * A bounded transport address is a tuple of {@link TransportAddress}, one array that represents + * the addresses the transport is bound to, and the other is the published one that represents the address clients + * should communicate on. * * */ public class BoundTransportAddress implements Streamable { - private TransportAddress boundAddress; + private TransportAddress[] boundAddresses; private TransportAddress publishAddress; BoundTransportAddress() { } - public BoundTransportAddress(TransportAddress boundAddress, TransportAddress publishAddress) { - this.boundAddress = boundAddress; + public BoundTransportAddress(TransportAddress[] boundAddresses, TransportAddress publishAddress) { + if (boundAddresses == null || boundAddresses.length < 1) { + throw new IllegalArgumentException("at least one bound address must be provided"); + } + this.boundAddresses = boundAddresses; this.publishAddress = publishAddress; } - public TransportAddress boundAddress() { - return boundAddress; + public TransportAddress[] boundAddresses() { + return boundAddresses; } public TransportAddress publishAddress() { @@ -62,18 +65,38 @@ public class BoundTransportAddress implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { - boundAddress = TransportAddressSerializers.addressFromStream(in); + int boundAddressLength = in.readInt(); + boundAddresses = new TransportAddress[boundAddressLength]; + for (int i = 0; i < boundAddressLength; i++) { + boundAddresses[i] = TransportAddressSerializers.addressFromStream(in); + } publishAddress = TransportAddressSerializers.addressFromStream(in); } @Override public void writeTo(StreamOutput out) throws IOException { - TransportAddressSerializers.addressToStream(out, boundAddress); + out.writeInt(boundAddresses.length); + for (TransportAddress address : boundAddresses) { + TransportAddressSerializers.addressToStream(out, address); + } TransportAddressSerializers.addressToStream(out, publishAddress); } @Override public String toString() { - return "bound_address {" + boundAddress + "}, publish_address {" + publishAddress + "}"; + StringBuilder builder = new StringBuilder("publish_address {"); + builder.append(publishAddress); + builder.append("}, bound_addresses "); + boolean firstAdded = false; + for (TransportAddress address : boundAddresses) { + if (firstAdded) { + builder.append(", "); + } else { + firstAdded = true; + } + + builder.append("{").append(address).append("}"); + } + return builder.toString(); } } diff --git a/core/src/main/java/org/elasticsearch/http/HttpInfo.java b/core/src/main/java/org/elasticsearch/http/HttpInfo.java index 6b425d6f546..df51de96d3b 100644 --- a/core/src/main/java/org/elasticsearch/http/HttpInfo.java +++ b/core/src/main/java/org/elasticsearch/http/HttpInfo.java @@ -57,7 +57,7 @@ public class HttpInfo implements Streamable, ToXContent { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.HTTP); - builder.field(Fields.BOUND_ADDRESS, address.boundAddress().toString()); + builder.array(Fields.BOUND_ADDRESS, (Object[]) address.boundAddresses()); builder.field(Fields.PUBLISH_ADDRESS, address.publishAddress().toString()); builder.byteSizeField(Fields.MAX_CONTENT_LENGTH_IN_BYTES, Fields.MAX_CONTENT_LENGTH, maxContentLength); builder.endObject(); diff --git a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index 634e38fe24b..a794f52dfaf 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -28,10 +28,7 @@ import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.BoundTransportAddress; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.transport.NetworkExceptionHelper; -import org.elasticsearch.common.transport.PortsRange; +import org.elasticsearch.common.transport.*; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; @@ -52,10 +49,8 @@ import org.jboss.netty.handler.timeout.ReadTimeoutException; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; @@ -255,12 +250,13 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent boundAddresses = new ArrayList<>(hostAddresses.length); for (InetAddress address : hostAddresses) { - bindAddress(address); + boundAddresses.add(bindAddress(address)); } - InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get(0).getLocalAddress(); + InetSocketTransportAddress boundAddress = boundAddresses.get(0); InetSocketAddress publishAddress; if (0 == publishPort) { publishPort = boundAddress.getPort(); @@ -270,10 +266,10 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent lastException = new AtomicReference<>(); final AtomicReference boundSocket = new AtomicReference<>(); @@ -296,7 +292,11 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent 0) { for (Map.Entry entry : profileAddresses.entrySet()) { builder.startObject(entry.getKey()); - builder.field(Fields.BOUND_ADDRESS, entry.getValue().boundAddress().toString()); + builder.array(Fields.BOUND_ADDRESS, (Object[]) entry.getValue().boundAddresses()); builder.field(Fields.PUBLISH_ADDRESS, entry.getValue().publishAddress().toString()); builder.endObject(); } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 172a34e7e43..cecd034e7fe 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -171,6 +171,9 @@ public class TransportService extends AbstractLifecycleComponent entry : transport.profileBoundAddresses().entrySet()) { + logger.info("profile [{}]: {}", entry.getKey(), entry.getValue()); + } } boolean setStarted = started.compareAndSet(false, true); assert setStarted : "service was already started"; diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index a500fb386f8..8d2eb15a977 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -105,7 +105,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem if (previous != null) { throw new ElasticsearchException("local address [" + address + "] is already bound"); } - boundAddress = new BoundTransportAddress(localAddress, localAddress); + boundAddress = new BoundTransportAddress(new TransportAddress[] { localAddress }, localAddress); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index dd753e6beb7..b15add5445e 100644 --- a/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -89,7 +89,7 @@ public class LocalTransportChannel implements TransportChannel { public void sendResponse(Throwable error) throws IOException { BytesStreamOutput stream = new BytesStreamOutput(); writeResponseExceptionHeader(stream); - RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error); + RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddresses()[0], action, error); stream.writeThrowable(tx); final byte[] data = stream.bytes().toBytes(); diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 059590bbac6..5cf9ed31d32 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -329,12 +329,6 @@ public class NettyTransport extends AbstractLifecycleComponent implem createServerBootstrap(name, mergedSettings); bindServerBootstrap(name, mergedSettings); } - - InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get(DEFAULT_PROFILE).get(0).getLocalAddress(); - int publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", boundAddress.getPort())); - String publishHost = settings.get("transport.netty.publish_host", settings.get("transport.publish_host", settings.get("transport.host"))); - InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort); - this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)); } success = true; } finally { @@ -460,9 +454,9 @@ public class NettyTransport extends AbstractLifecycleComponent implem } } - private void bindServerBootstrap(final String name, final InetAddress hostAddress, Settings settings) { + private void bindServerBootstrap(final String name, final InetAddress hostAddress, Settings profileSettings) { - String port = settings.get("port"); + String port = profileSettings.get("port"); PortsRange portsRange = new PortsRange(port); final AtomicReference lastException = new AtomicReference<>(); final AtomicReference boundSocket = new AtomicReference<>(); @@ -478,7 +472,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem serverChannels.put(name, list); } list.add(channel); - boundSocket.set((InetSocketAddress)channel.getLocalAddress()); + boundSocket.set((InetSocketAddress) channel.getLocalAddress()); } } catch (Exception e) { lastException.set(e); @@ -491,16 +485,48 @@ public class NettyTransport extends AbstractLifecycleComponent implem throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get()); } + InetSocketAddress boundAddress = boundSocket.get(); + // TODO: We can remove the special casing for the default profile and store it in the profile map to reduce the complexity here if (!DEFAULT_PROFILE.equals(name)) { - InetSocketAddress boundAddress = boundSocket.get(); - int publishPort = settings.getAsInt("publish_port", boundAddress.getPort()); - String publishHost = settings.get("publish_host", boundAddress.getHostString()); - InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort); - // TODO: support real multihoming with publishing. Today we use putIfAbsent so only the prioritized address is published - profileBoundAddresses.putIfAbsent(name, new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress))); + // check to see if an address is already bound for this profile + BoundTransportAddress boundTransportAddress = profileBoundAddresses().get(name); + if (boundTransportAddress == null) { + // no address is bound, so lets create one with the publish address information from the settings or the bound address as a fallback + int publishPort = profileSettings.getAsInt("publish_port", boundAddress.getPort()); + String publishHost = profileSettings.get("publish_host", boundAddress.getHostString()); + InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort); + profileBoundAddresses.put(name, new BoundTransportAddress(new TransportAddress[]{new InetSocketTransportAddress(boundAddress)}, new InetSocketTransportAddress(publishAddress))); + } else { + // TODO: support real multihoming with publishing. Today we update the bound addresses so only the prioritized address is published + // an address already exists. add the new bound address to the end of a new array and create a new BoundTransportAddress with the array and existing publish address + // the new bound address is appended in order to preserve the ordering/priority of bound addresses + TransportAddress[] existingBoundAddress = boundTransportAddress.boundAddresses(); + TransportAddress[] updatedBoundAddresses = Arrays.copyOf(existingBoundAddress, existingBoundAddress.length + 1); + updatedBoundAddresses[updatedBoundAddresses.length - 1] = new InetSocketTransportAddress(boundAddress); + profileBoundAddresses.put(name, new BoundTransportAddress(updatedBoundAddresses, boundTransportAddress.publishAddress())); + } + } else { + if (this.boundAddress == null) { + // this is the first address that has been bound for the default profile so we get the publish address information and create a new BoundTransportAddress + // these calls are different from the profile ones due to the way the settings for a profile are created. If we want to merge the code for the default profile and + // other profiles together, we need to change how the profileSettings are built for the default profile... + int publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", boundAddress.getPort())); + String publishHost = settings.get("transport.netty.publish_host", settings.get("transport.publish_host", settings.get("transport.host"))); + InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort); + this.boundAddress = new BoundTransportAddress(new TransportAddress[]{new InetSocketTransportAddress(boundAddress)}, new InetSocketTransportAddress(publishAddress)); + } else { + // the default profile is already bound to one address and has the publish address, copy the existing bound addresses as is and append the new address. + // the new bound address is appended in order to preserve the ordering/priority of bound addresses + TransportAddress[] existingBoundAddress = this.boundAddress.boundAddresses(); + TransportAddress[] updatedBoundAddresses = Arrays.copyOf(existingBoundAddress, existingBoundAddress.length + 1); + updatedBoundAddresses[updatedBoundAddresses.length - 1] = new InetSocketTransportAddress(boundAddress); + this.boundAddress = new BoundTransportAddress(updatedBoundAddresses, this.boundAddress.publishAddress()); + } } - logger.info("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get())); + if (logger.isDebugEnabled()) { + logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get())); + } } private void createServerBootstrap(String name, Settings settings) { diff --git a/core/src/test/java/org/elasticsearch/common/transport/BoundTransportAddressTests.java b/core/src/test/java/org/elasticsearch/common/transport/BoundTransportAddressTests.java new file mode 100644 index 00000000000..41ed95a519e --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/transport/BoundTransportAddressTests.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.transport; + +import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ESTestCase; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.*; + +/** + * Basic tests for the {@link BoundTransportAddress} class. These tests should not bind to any addresses but should + * just test things like serialization and exception handling... + */ +public class BoundTransportAddressTests extends ESTestCase { + + public void testSerialization() throws Exception { + InetAddress[] inetAddresses = InetAddress.getAllByName("0.0.0.0"); + List transportAddressList = new ArrayList<>(); + for (InetAddress address : inetAddresses) { + transportAddressList.add(new InetSocketTransportAddress(address, randomIntBetween(9200, 9299))); + } + final BoundTransportAddress transportAddress = new BoundTransportAddress(transportAddressList.toArray(new InetSocketTransportAddress[0]), transportAddressList.get(0)); + assertThat(transportAddress.boundAddresses().length, equalTo(transportAddressList.size())); + + // serialize + BytesStreamOutput streamOutput = new BytesStreamOutput(); + transportAddress.writeTo(streamOutput); + StreamInput in = ByteBufferStreamInput.wrap(streamOutput.bytes()); + + BoundTransportAddress serializedAddress; + if (randomBoolean()) { + serializedAddress = BoundTransportAddress.readBoundTransportAddress(in); + } else { + serializedAddress = new BoundTransportAddress(); + serializedAddress.readFrom(in); + } + + assertThat(serializedAddress, not(sameInstance(transportAddress))); + assertThat(serializedAddress.boundAddresses().length, equalTo(transportAddress.boundAddresses().length)); + assertThat(serializedAddress.publishAddress(), equalTo(transportAddress.publishAddress())); + + TransportAddress[] serializedBoundAddresses = serializedAddress.boundAddresses(); + TransportAddress[] boundAddresses = transportAddress.boundAddresses(); + for (int i = 0; i < serializedBoundAddresses.length; i++) { + assertThat(serializedBoundAddresses[i], equalTo(boundAddresses[i])); + } + } + + public void testBadBoundAddressArray() { + try { + TransportAddress[] badArray = randomBoolean() ? null : new TransportAddress[0]; + new BoundTransportAddress(badArray, new InetSocketTransportAddress(InetAddress.getLoopbackAddress(), 80)); + fail("expected an exception to be thrown due to no bound address"); + } catch (IllegalArgumentException e) { + //expected + } + } +} diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java index 2070076ef35..74e30d5f319 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java @@ -95,7 +95,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase { Settings settings = settingsBuilder().put("http.pipelining", true).build(); httpServerTransport = new CustomNettyHttpServerTransport(settings); httpServerTransport.start(); - InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress(); + InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); List requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast"); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { @@ -110,7 +110,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase { Settings settings = settingsBuilder().put("http.pipelining", false).build(); httpServerTransport = new CustomNettyHttpServerTransport(settings); httpServerTransport.start(); - InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress(); + InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); List requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500"); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java index 1d968b75b73..964f9851b09 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java @@ -56,7 +56,7 @@ public class NettyPipeliningDisabledIT extends ESIntegTestCase { List requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/"); HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); - InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress(); + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java index 84a0f9b3642..eafd242ec33 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java @@ -52,7 +52,7 @@ public class NettyPipeliningEnabledIT extends ESIntegTestCase { List requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/"); HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); - InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress(); + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); diff --git a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java index 3003a7ba595..44039ce157f 100644 --- a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java +++ b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java @@ -70,7 +70,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase { TransportService transportService = new TransportService(nettyTransport, threadPool); nettyTransport.transportServiceAdapter(transportService.createAdapter()); - InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) nettyTransport.boundAddress().boundAddress(); + InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(nettyTransport.boundAddress().boundAddresses()); port = transportAddress.address().getPort(); host = transportAddress.address().getAddress(); diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java index 48c02153f27..9a6486134da 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java @@ -26,10 +26,9 @@ import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.Network; -import org.elasticsearch.test.transport.AssertingLocalTransport; -import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportModule; import org.junit.Test; @@ -86,11 +85,15 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase { for (NodeInfo nodeInfo : response.getNodes()) { assertThat(nodeInfo.getTransport().getProfileAddresses().keySet(), hasSize(1)); assertThat(nodeInfo.getTransport().getProfileAddresses(), hasKey("client1")); - assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddress(), instanceOf(InetSocketTransportAddress.class)); + for (TransportAddress transportAddress : nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddresses()) { + assertThat(transportAddress, instanceOf(InetSocketTransportAddress.class)); + } - // bound address - InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddress(); - assertThat(inetSocketTransportAddress.address().getPort(), is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10)))); + // bound addresses + for (TransportAddress transportAddress : nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddresses()) { + assertThat(transportAddress, instanceOf(InetSocketTransportAddress.class)); + assertThat(((InetSocketTransportAddress) transportAddress).address().getPort(), is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10)))); + } // publish address assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(), instanceOf(InetSocketTransportAddress.class));