expose all addresses that the transports are bound to

In #12942, the NettyTransport and NettyHttpServerTransport were updated to allow for binding
to multiple addresses. However, the BoundTransportAddress holder only exposed the first address
that the transport was bound to and this object is used to populate the values returned to the user
via our APIs.

This change exposes all of the bound addresses in the BoundTransportAddress holder, which allows
for an accurate representation of all interfaces that elasticsearch is bound to and listening on.
This commit is contained in:
jaymode 2015-09-15 14:26:20 -04:00
parent a77c68ba0e
commit 5b8b15e729
14 changed files with 191 additions and 55 deletions

View File

@ -26,28 +26,31 @@ import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* A bounded transport address is a tuple of two {@link TransportAddress}, one that represents
* the address the transport is bounded on, the the published one represents the one clients should
* communicate on.
* A bounded transport address is a tuple of {@link TransportAddress}, one array that represents
* the addresses the transport is bound to, and the other is the published one that represents the address clients
* should communicate on.
*
*
*/
public class BoundTransportAddress implements Streamable {
private TransportAddress boundAddress;
private TransportAddress[] boundAddresses;
private TransportAddress publishAddress;
BoundTransportAddress() {
}
public BoundTransportAddress(TransportAddress boundAddress, TransportAddress publishAddress) {
this.boundAddress = boundAddress;
public BoundTransportAddress(TransportAddress[] boundAddresses, TransportAddress publishAddress) {
if (boundAddresses == null || boundAddresses.length < 1) {
throw new IllegalArgumentException("at least one bound address must be provided");
}
this.boundAddresses = boundAddresses;
this.publishAddress = publishAddress;
}
public TransportAddress boundAddress() {
return boundAddress;
public TransportAddress[] boundAddresses() {
return boundAddresses;
}
public TransportAddress publishAddress() {
@ -62,18 +65,38 @@ public class BoundTransportAddress implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
boundAddress = TransportAddressSerializers.addressFromStream(in);
int boundAddressLength = in.readInt();
boundAddresses = new TransportAddress[boundAddressLength];
for (int i = 0; i < boundAddressLength; i++) {
boundAddresses[i] = TransportAddressSerializers.addressFromStream(in);
}
publishAddress = TransportAddressSerializers.addressFromStream(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
TransportAddressSerializers.addressToStream(out, boundAddress);
out.writeInt(boundAddresses.length);
for (TransportAddress address : boundAddresses) {
TransportAddressSerializers.addressToStream(out, address);
}
TransportAddressSerializers.addressToStream(out, publishAddress);
}
@Override
public String toString() {
return "bound_address {" + boundAddress + "}, publish_address {" + publishAddress + "}";
StringBuilder builder = new StringBuilder("publish_address {");
builder.append(publishAddress);
builder.append("}, bound_addresses ");
boolean firstAdded = false;
for (TransportAddress address : boundAddresses) {
if (firstAdded) {
builder.append(", ");
} else {
firstAdded = true;
}
builder.append("{").append(address).append("}");
}
return builder.toString();
}
}

View File

@ -57,7 +57,7 @@ public class HttpInfo implements Streamable, ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.HTTP);
builder.field(Fields.BOUND_ADDRESS, address.boundAddress().toString());
builder.array(Fields.BOUND_ADDRESS, (Object[]) address.boundAddresses());
builder.field(Fields.PUBLISH_ADDRESS, address.publishAddress().toString());
builder.byteSizeField(Fields.MAX_CONTENT_LENGTH_IN_BYTES, Fields.MAX_CONTENT_LENGTH, maxContentLength);
builder.endObject();

View File

@ -28,10 +28,7 @@ import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.common.transport.*;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
@ -52,10 +49,8 @@ import org.jboss.netty.handler.timeout.ReadTimeoutException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@ -255,12 +250,13 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
} catch (IOException e) {
throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e);
}
List<InetSocketTransportAddress> boundAddresses = new ArrayList<>(hostAddresses.length);
for (InetAddress address : hostAddresses) {
bindAddress(address);
boundAddresses.add(bindAddress(address));
}
InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get(0).getLocalAddress();
InetSocketTransportAddress boundAddress = boundAddresses.get(0);
InetSocketAddress publishAddress;
if (0 == publishPort) {
publishPort = boundAddress.getPort();
@ -270,10 +266,10 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}
this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[boundAddresses.size()]), new InetSocketTransportAddress(publishAddress));
}
private void bindAddress(final InetAddress hostAddress) {
private InetSocketTransportAddress bindAddress(final InetAddress hostAddress) {
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
@ -296,7 +292,11 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
if (!success) {
throw new BindHttpException("Failed to bind to [" + port + "]", lastException.get());
}
logger.info("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
if (logger.isDebugEnabled()) {
logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
return new InetSocketTransportAddress(boundSocket.get());
}
@Override

View File

@ -58,13 +58,13 @@ public class TransportInfo implements Streamable, ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.TRANSPORT);
builder.field(Fields.BOUND_ADDRESS, address.boundAddress().toString());
builder.array(Fields.BOUND_ADDRESS, (Object[]) address.boundAddresses());
builder.field(Fields.PUBLISH_ADDRESS, address.publishAddress().toString());
builder.startObject(Fields.PROFILES);
if (profileAddresses != null && profileAddresses.size() > 0) {
for (Map.Entry<String, BoundTransportAddress> entry : profileAddresses.entrySet()) {
builder.startObject(entry.getKey());
builder.field(Fields.BOUND_ADDRESS, entry.getValue().boundAddress().toString());
builder.array(Fields.BOUND_ADDRESS, (Object[]) entry.getValue().boundAddresses());
builder.field(Fields.PUBLISH_ADDRESS, entry.getValue().publishAddress().toString());
builder.endObject();
}

View File

@ -171,6 +171,9 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
transport.start();
if (transport.boundAddress() != null && logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress());
for (Map.Entry<String, BoundTransportAddress> entry : transport.profileBoundAddresses().entrySet()) {
logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());
}
}
boolean setStarted = started.compareAndSet(false, true);
assert setStarted : "service was already started";

View File

@ -105,7 +105,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
if (previous != null) {
throw new ElasticsearchException("local address [" + address + "] is already bound");
}
boundAddress = new BoundTransportAddress(localAddress, localAddress);
boundAddress = new BoundTransportAddress(new TransportAddress[] { localAddress }, localAddress);
}
@Override

View File

@ -89,7 +89,7 @@ public class LocalTransportChannel implements TransportChannel {
public void sendResponse(Throwable error) throws IOException {
BytesStreamOutput stream = new BytesStreamOutput();
writeResponseExceptionHeader(stream);
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error);
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddresses()[0], action, error);
stream.writeThrowable(tx);
final byte[] data = stream.bytes().toBytes();

View File

@ -329,12 +329,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
createServerBootstrap(name, mergedSettings);
bindServerBootstrap(name, mergedSettings);
}
InetSocketAddress boundAddress = (InetSocketAddress) serverChannels.get(DEFAULT_PROFILE).get(0).getLocalAddress();
int publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", boundAddress.getPort()));
String publishHost = settings.get("transport.netty.publish_host", settings.get("transport.publish_host", settings.get("transport.host")));
InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort);
this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
}
success = true;
} finally {
@ -460,9 +454,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
}
private void bindServerBootstrap(final String name, final InetAddress hostAddress, Settings settings) {
private void bindServerBootstrap(final String name, final InetAddress hostAddress, Settings profileSettings) {
String port = settings.get("port");
String port = profileSettings.get("port");
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
@ -478,7 +472,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
serverChannels.put(name, list);
}
list.add(channel);
boundSocket.set((InetSocketAddress)channel.getLocalAddress());
boundSocket.set((InetSocketAddress) channel.getLocalAddress());
}
} catch (Exception e) {
lastException.set(e);
@ -491,16 +485,48 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
}
InetSocketAddress boundAddress = boundSocket.get();
// TODO: We can remove the special casing for the default profile and store it in the profile map to reduce the complexity here
if (!DEFAULT_PROFILE.equals(name)) {
InetSocketAddress boundAddress = boundSocket.get();
int publishPort = settings.getAsInt("publish_port", boundAddress.getPort());
String publishHost = settings.get("publish_host", boundAddress.getHostString());
InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort);
// TODO: support real multihoming with publishing. Today we use putIfAbsent so only the prioritized address is published
profileBoundAddresses.putIfAbsent(name, new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)));
// check to see if an address is already bound for this profile
BoundTransportAddress boundTransportAddress = profileBoundAddresses().get(name);
if (boundTransportAddress == null) {
// no address is bound, so lets create one with the publish address information from the settings or the bound address as a fallback
int publishPort = profileSettings.getAsInt("publish_port", boundAddress.getPort());
String publishHost = profileSettings.get("publish_host", boundAddress.getHostString());
InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort);
profileBoundAddresses.put(name, new BoundTransportAddress(new TransportAddress[]{new InetSocketTransportAddress(boundAddress)}, new InetSocketTransportAddress(publishAddress)));
} else {
// TODO: support real multihoming with publishing. Today we update the bound addresses so only the prioritized address is published
// an address already exists. add the new bound address to the end of a new array and create a new BoundTransportAddress with the array and existing publish address
// the new bound address is appended in order to preserve the ordering/priority of bound addresses
TransportAddress[] existingBoundAddress = boundTransportAddress.boundAddresses();
TransportAddress[] updatedBoundAddresses = Arrays.copyOf(existingBoundAddress, existingBoundAddress.length + 1);
updatedBoundAddresses[updatedBoundAddresses.length - 1] = new InetSocketTransportAddress(boundAddress);
profileBoundAddresses.put(name, new BoundTransportAddress(updatedBoundAddresses, boundTransportAddress.publishAddress()));
}
} else {
if (this.boundAddress == null) {
// this is the first address that has been bound for the default profile so we get the publish address information and create a new BoundTransportAddress
// these calls are different from the profile ones due to the way the settings for a profile are created. If we want to merge the code for the default profile and
// other profiles together, we need to change how the profileSettings are built for the default profile...
int publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", boundAddress.getPort()));
String publishHost = settings.get("transport.netty.publish_host", settings.get("transport.publish_host", settings.get("transport.host")));
InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort);
this.boundAddress = new BoundTransportAddress(new TransportAddress[]{new InetSocketTransportAddress(boundAddress)}, new InetSocketTransportAddress(publishAddress));
} else {
// the default profile is already bound to one address and has the publish address, copy the existing bound addresses as is and append the new address.
// the new bound address is appended in order to preserve the ordering/priority of bound addresses
TransportAddress[] existingBoundAddress = this.boundAddress.boundAddresses();
TransportAddress[] updatedBoundAddresses = Arrays.copyOf(existingBoundAddress, existingBoundAddress.length + 1);
updatedBoundAddresses[updatedBoundAddresses.length - 1] = new InetSocketTransportAddress(boundAddress);
this.boundAddress = new BoundTransportAddress(updatedBoundAddresses, this.boundAddress.publishAddress());
}
}
logger.info("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
if (logger.isDebugEnabled()) {
logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
}
}
private void createServerBootstrap(String name, Settings settings) {

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.transport;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.Matchers.*;
/**
* Basic tests for the {@link BoundTransportAddress} class. These tests should not bind to any addresses but should
* just test things like serialization and exception handling...
*/
public class BoundTransportAddressTests extends ESTestCase {
public void testSerialization() throws Exception {
InetAddress[] inetAddresses = InetAddress.getAllByName("0.0.0.0");
List<InetSocketTransportAddress> transportAddressList = new ArrayList<>();
for (InetAddress address : inetAddresses) {
transportAddressList.add(new InetSocketTransportAddress(address, randomIntBetween(9200, 9299)));
}
final BoundTransportAddress transportAddress = new BoundTransportAddress(transportAddressList.toArray(new InetSocketTransportAddress[0]), transportAddressList.get(0));
assertThat(transportAddress.boundAddresses().length, equalTo(transportAddressList.size()));
// serialize
BytesStreamOutput streamOutput = new BytesStreamOutput();
transportAddress.writeTo(streamOutput);
StreamInput in = ByteBufferStreamInput.wrap(streamOutput.bytes());
BoundTransportAddress serializedAddress;
if (randomBoolean()) {
serializedAddress = BoundTransportAddress.readBoundTransportAddress(in);
} else {
serializedAddress = new BoundTransportAddress();
serializedAddress.readFrom(in);
}
assertThat(serializedAddress, not(sameInstance(transportAddress)));
assertThat(serializedAddress.boundAddresses().length, equalTo(transportAddress.boundAddresses().length));
assertThat(serializedAddress.publishAddress(), equalTo(transportAddress.publishAddress()));
TransportAddress[] serializedBoundAddresses = serializedAddress.boundAddresses();
TransportAddress[] boundAddresses = transportAddress.boundAddresses();
for (int i = 0; i < serializedBoundAddresses.length; i++) {
assertThat(serializedBoundAddresses[i], equalTo(boundAddresses[i]));
}
}
public void testBadBoundAddressArray() {
try {
TransportAddress[] badArray = randomBoolean() ? null : new TransportAddress[0];
new BoundTransportAddress(badArray, new InetSocketTransportAddress(InetAddress.getLoopbackAddress(), 80));
fail("expected an exception to be thrown due to no bound address");
} catch (IllegalArgumentException e) {
//expected
}
}
}

View File

@ -95,7 +95,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
Settings settings = settingsBuilder().put("http.pipelining", true).build();
httpServerTransport = new CustomNettyHttpServerTransport(settings);
httpServerTransport.start();
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
List<String> requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast");
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
@ -110,7 +110,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
Settings settings = settingsBuilder().put("http.pipelining", false).build();
httpServerTransport = new CustomNettyHttpServerTransport(settings);
httpServerTransport.start();
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
List<String> requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500");
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {

View File

@ -56,7 +56,7 @@ public class NettyPipeliningDisabledIT extends ESIntegTestCase {
List<String> requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/");
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
Collection<HttpResponse> responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));

View File

@ -52,7 +52,7 @@ public class NettyPipeliningEnabledIT extends ESIntegTestCase {
List<String> requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/");
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
Collection<HttpResponse> responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));

View File

@ -70,7 +70,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
TransportService transportService = new TransportService(nettyTransport, threadPool);
nettyTransport.transportServiceAdapter(transportService.createAdapter());
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) nettyTransport.boundAddress().boundAddress();
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(nettyTransport.boundAddress().boundAddresses());
port = transportAddress.address().getPort();
host = transportAddress.address().getAddress();

View File

@ -26,10 +26,9 @@ import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.Network;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule;
import org.junit.Test;
@ -86,11 +85,15 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
for (NodeInfo nodeInfo : response.getNodes()) {
assertThat(nodeInfo.getTransport().getProfileAddresses().keySet(), hasSize(1));
assertThat(nodeInfo.getTransport().getProfileAddresses(), hasKey("client1"));
assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddress(), instanceOf(InetSocketTransportAddress.class));
for (TransportAddress transportAddress : nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddresses()) {
assertThat(transportAddress, instanceOf(InetSocketTransportAddress.class));
}
// bound address
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddress();
assertThat(inetSocketTransportAddress.address().getPort(), is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10))));
// bound addresses
for (TransportAddress transportAddress : nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddresses()) {
assertThat(transportAddress, instanceOf(InetSocketTransportAddress.class));
assertThat(((InetSocketTransportAddress) transportAddress).address().getPort(), is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10))));
}
// publish address
assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(), instanceOf(InetSocketTransportAddress.class));