Port of publishAddress should match port of corresponding boundAddress

Closes #14535
Closes #14503
Closes #14514
Closes #14513
Closes #14476
This commit is contained in:
Yannick Welsch 2015-11-04 23:11:03 +01:00
parent efde8e8a26
commit 1902c66aaf
4 changed files with 181 additions and 57 deletions

View File

@ -28,6 +28,10 @@ import java.io.IOException;
*/
public class BindHttpException extends HttpException {
public BindHttpException(String message) {
super(message);
}
public BindHttpException(String message, Throwable cause) {
super(message, cause);
}

View File

@ -257,16 +257,28 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
boundAddresses.add(bindAddress(address));
}
InetSocketTransportAddress boundAddress = boundAddresses.get(0);
InetSocketAddress publishAddress;
if (0 == publishPort) {
publishPort = boundAddress.getPort();
}
final InetAddress publishInetAddress;
try {
publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddresses(publishHosts), publishPort);
publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts);
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}
if (0 == publishPort) {
for (InetSocketTransportAddress boundAddress : boundAddresses) {
InetAddress boundInetAddress = boundAddress.address().getAddress();
if (boundInetAddress.isAnyLocalAddress() || boundInetAddress.equals(publishInetAddress)) {
publishPort = boundAddress.getPort();
break;
}
}
}
if (0 == publishPort) {
throw new BindHttpException("Publish address [" + publishInetAddress + "] does not match any of the bound addresses [" + boundAddresses + "]");
}
final InetSocketAddress publishAddress = new InetSocketAddress(publishInetAddress, publishPort);;
this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[boundAddresses.size()]), new InetSocketTransportAddress(publishAddress));
}

View File

@ -343,14 +343,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return unmodifiableMap(new HashMap<>(profileBoundAddresses));
}
private InetSocketAddress createPublishAddress(String publishHosts[], int publishPort) {
try {
return new InetSocketAddress(networkService.resolvePublishHostAddresses(publishHosts), publishPort);
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}
}
private ClientBootstrap createClientBootstrap() {
if (blockingClient) {
@ -449,14 +441,24 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
logger.debug("binding server bootstrap to: {}", (Object)addresses);
}
assert hostAddresses.length > 0;
List<InetSocketAddress> boundAddresses = new ArrayList<>();
for (InetAddress hostAddress : hostAddresses) {
bindServerBootstrap(name, hostAddress, settings);
boundAddresses.add(bindToPort(name, hostAddress, settings.get("port")));
}
final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(name, settings, boundAddresses);
if (DEFAULT_PROFILE.equals(name)) {
this.boundAddress = boundTransportAddress;
} else {
profileBoundAddresses.put(name, boundTransportAddress);
}
}
private void bindServerBootstrap(final String name, final InetAddress hostAddress, Settings profileSettings) {
String port = profileSettings.get("port");
private InetSocketAddress bindToPort(final String name, final InetAddress hostAddress, String port) {
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
@ -485,48 +487,64 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
}
InetSocketAddress boundAddress = boundSocket.get();
// TODO: We can remove the special casing for the default profile and store it in the profile map to reduce the complexity here
if (!DEFAULT_PROFILE.equals(name)) {
// check to see if an address is already bound for this profile
BoundTransportAddress boundTransportAddress = profileBoundAddresses().get(name);
if (boundTransportAddress == null) {
// no address is bound, so lets create one with the publish address information from the settings or the bound address as a fallback
int publishPort = profileSettings.getAsInt("publish_port", boundAddress.getPort());
String publishHosts[] = profileSettings.getAsArray("publish_host", new String[] { boundAddress.getHostString() });
InetSocketAddress publishAddress = createPublishAddress(publishHosts, publishPort);
profileBoundAddresses.put(name, new BoundTransportAddress(new TransportAddress[]{new InetSocketTransportAddress(boundAddress)}, new InetSocketTransportAddress(publishAddress)));
} else {
// TODO: support real multihoming with publishing. Today we update the bound addresses so only the prioritized address is published
// an address already exists. add the new bound address to the end of a new array and create a new BoundTransportAddress with the array and existing publish address
// the new bound address is appended in order to preserve the ordering/priority of bound addresses
TransportAddress[] existingBoundAddress = boundTransportAddress.boundAddresses();
TransportAddress[] updatedBoundAddresses = Arrays.copyOf(existingBoundAddress, existingBoundAddress.length + 1);
updatedBoundAddresses[updatedBoundAddresses.length - 1] = new InetSocketTransportAddress(boundAddress);
profileBoundAddresses.put(name, new BoundTransportAddress(updatedBoundAddresses, boundTransportAddress.publishAddress()));
}
} else {
if (this.boundAddress == null) {
// this is the first address that has been bound for the default profile so we get the publish address information and create a new BoundTransportAddress
// these calls are different from the profile ones due to the way the settings for a profile are created. If we want to merge the code for the default profile and
// other profiles together, we need to change how the profileSettings are built for the default profile...
int publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", boundAddress.getPort()));
String publishHosts[] = settings.getAsArray("transport.netty.publish_host", settings.getAsArray("transport.publish_host", settings.getAsArray("transport.host", null)));
InetSocketAddress publishAddress = createPublishAddress(publishHosts, publishPort);
this.boundAddress = new BoundTransportAddress(new TransportAddress[]{new InetSocketTransportAddress(boundAddress)}, new InetSocketTransportAddress(publishAddress));
} else {
// the default profile is already bound to one address and has the publish address, copy the existing bound addresses as is and append the new address.
// the new bound address is appended in order to preserve the ordering/priority of bound addresses
TransportAddress[] existingBoundAddress = this.boundAddress.boundAddresses();
TransportAddress[] updatedBoundAddresses = Arrays.copyOf(existingBoundAddress, existingBoundAddress.length + 1);
updatedBoundAddresses[updatedBoundAddresses.length - 1] = new InetSocketTransportAddress(boundAddress);
this.boundAddress = new BoundTransportAddress(updatedBoundAddresses, this.boundAddress.publishAddress());
}
}
if (logger.isDebugEnabled()) {
logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
}
return boundSocket.get();
}
private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings, List<InetSocketAddress> boundAddresses) {
String[] boundAddressesHostStrings = new String[boundAddresses.size()];
TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()];
for (int i = 0; i < boundAddresses.size(); i++) {
InetSocketAddress boundAddress = boundAddresses.get(i);
boundAddressesHostStrings[i] = boundAddress.getHostString();
transportBoundAddresses[i] = new InetSocketTransportAddress(boundAddress);
}
final String[] publishHosts;
if (DEFAULT_PROFILE.equals(name)) {
publishHosts = settings.getAsArray("transport.netty.publish_host", settings.getAsArray("transport.publish_host", settings.getAsArray("transport.host", null)));
} else {
publishHosts = profileSettings.getAsArray("publish_host", boundAddressesHostStrings);
}
final InetAddress publishInetAddress;
try {
publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts);
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}
Integer publishPort;
if (DEFAULT_PROFILE.equals(name)) {
publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", null));
} else {
publishPort = profileSettings.getAsInt("publish_port", null);
}
// if port not explicitly provided, search for port of address in boundAddresses that matches publishInetAddress
if (publishPort == null) {
for (InetSocketAddress boundAddress : boundAddresses) {
InetAddress boundInetAddress = boundAddress.getAddress();
if (boundInetAddress.isAnyLocalAddress() || boundInetAddress.equals(publishInetAddress)) {
publishPort = boundAddress.getPort();
break;
}
}
}
// if port still not matches, just take port of first bound address
if (publishPort == null) {
// TODO: In case of DEFAULT_PROFILE we should probably fail here, as publish address does not match any bound address
// In case of a custom profile, we might use the publish address of the default profile
publishPort = boundAddresses.get(0).getPort();
logger.warn("Publish port not found by matching publish address [{}] to bound addresses [{}], falling back to port [{}] of first bound address", publishInetAddress, boundAddresses, publishPort);
}
final TransportAddress publishAddress = new InetSocketTransportAddress(new InetSocketAddress(publishInetAddress, publishPort));
return new BoundTransportAddress(transportBoundAddresses, publishAddress);
}
private void createServerBootstrap(String name, Settings settings) {

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.TransportModule;
import java.net.Inet4Address;
import java.net.Inet6Address;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
/**
* Checks that Elasticsearch produces a sane publish_address when it binds to
* different ports on ipv4 and ipv6.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class NettyTransportPublishAddressIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(TransportModule.TRANSPORT_TYPE_KEY, "netty")
.put("node.mode", "network").build();
}
public void testDifferentPorts() throws Exception {
if (!NetworkUtils.SUPPORTS_V6) {
return;
}
logger.info("--> starting a node on ipv4 only");
Settings ipv4Settings = Settings.builder().put("network.host", "127.0.0.1").build();
String ipv4OnlyNode = internalCluster().startNode(ipv4Settings); // should bind 127.0.0.1:XYZ
logger.info("--> starting a node on ipv4 and ipv6");
Settings bothSettings = Settings.builder().put("network.host", "_local_").build();
internalCluster().startNode(bothSettings); // should bind [::1]:XYZ and 127.0.0.1:XYZ+1
logger.info("--> waiting for the cluster to declare itself stable");
ensureStableCluster(2); // fails if port of publish address does not match corresponding bound address
logger.info("--> checking if boundAddress matching publishAddress has same port");
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().get();
for (NodeInfo nodeInfo : nodesInfoResponse) {
BoundTransportAddress boundTransportAddress = nodeInfo.getTransport().getAddress();
if (nodeInfo.getNode().getName().equals(ipv4OnlyNode)) {
assertThat(boundTransportAddress.boundAddresses().length, equalTo(1));
assertThat(boundTransportAddress.boundAddresses()[0].getPort(), equalTo(boundTransportAddress.publishAddress().getPort()));
} else {
assertThat(boundTransportAddress.boundAddresses().length, greaterThan(1));
for (TransportAddress boundAddress : boundTransportAddress.boundAddresses()) {
assertThat(boundAddress, instanceOf(InetSocketTransportAddress.class));
InetSocketTransportAddress inetBoundAddress = (InetSocketTransportAddress) boundAddress;
if (inetBoundAddress.address().getAddress() instanceof Inet4Address) {
// IPv4 address is preferred publish address for _local_
assertThat(inetBoundAddress.getPort(), equalTo(boundTransportAddress.publishAddress().getPort()));
}
}
}
}
}
}