Merge pull request #14535 from ywelsch/fix/bound-vs-publish-port
Port of publishAddress should match port of corresponding boundAddress
This commit is contained in:
commit
1c31845b7a
|
@ -28,6 +28,10 @@ import java.io.IOException;
|
|||
*/
|
||||
public class BindHttpException extends HttpException {
|
||||
|
||||
public BindHttpException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public BindHttpException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
|
|
@ -257,16 +257,28 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
boundAddresses.add(bindAddress(address));
|
||||
}
|
||||
|
||||
InetSocketTransportAddress boundAddress = boundAddresses.get(0);
|
||||
InetSocketAddress publishAddress;
|
||||
if (0 == publishPort) {
|
||||
publishPort = boundAddress.getPort();
|
||||
}
|
||||
final InetAddress publishInetAddress;
|
||||
try {
|
||||
publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddresses(publishHosts), publishPort);
|
||||
publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts);
|
||||
} catch (Exception e) {
|
||||
throw new BindTransportException("Failed to resolve publish address", e);
|
||||
}
|
||||
|
||||
if (0 == publishPort) {
|
||||
for (InetSocketTransportAddress boundAddress : boundAddresses) {
|
||||
InetAddress boundInetAddress = boundAddress.address().getAddress();
|
||||
if (boundInetAddress.isAnyLocalAddress() || boundInetAddress.equals(publishInetAddress)) {
|
||||
publishPort = boundAddress.getPort();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (0 == publishPort) {
|
||||
throw new BindHttpException("Publish address [" + publishInetAddress + "] does not match any of the bound addresses [" + boundAddresses + "]");
|
||||
}
|
||||
|
||||
final InetSocketAddress publishAddress = new InetSocketAddress(publishInetAddress, publishPort);;
|
||||
this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[boundAddresses.size()]), new InetSocketTransportAddress(publishAddress));
|
||||
}
|
||||
|
||||
|
|
|
@ -343,14 +343,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
return unmodifiableMap(new HashMap<>(profileBoundAddresses));
|
||||
}
|
||||
|
||||
private InetSocketAddress createPublishAddress(String publishHosts[], int publishPort) {
|
||||
try {
|
||||
return new InetSocketAddress(networkService.resolvePublishHostAddresses(publishHosts), publishPort);
|
||||
} catch (Exception e) {
|
||||
throw new BindTransportException("Failed to resolve publish address", e);
|
||||
}
|
||||
}
|
||||
|
||||
private ClientBootstrap createClientBootstrap() {
|
||||
|
||||
if (blockingClient) {
|
||||
|
@ -449,14 +441,24 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
}
|
||||
logger.debug("binding server bootstrap to: {}", (Object)addresses);
|
||||
}
|
||||
|
||||
assert hostAddresses.length > 0;
|
||||
|
||||
List<InetSocketAddress> boundAddresses = new ArrayList<>();
|
||||
for (InetAddress hostAddress : hostAddresses) {
|
||||
bindServerBootstrap(name, hostAddress, settings);
|
||||
boundAddresses.add(bindToPort(name, hostAddress, settings.get("port")));
|
||||
}
|
||||
|
||||
final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(name, settings, boundAddresses);
|
||||
|
||||
if (DEFAULT_PROFILE.equals(name)) {
|
||||
this.boundAddress = boundTransportAddress;
|
||||
} else {
|
||||
profileBoundAddresses.put(name, boundTransportAddress);
|
||||
}
|
||||
}
|
||||
|
||||
private void bindServerBootstrap(final String name, final InetAddress hostAddress, Settings profileSettings) {
|
||||
|
||||
String port = profileSettings.get("port");
|
||||
private InetSocketAddress bindToPort(final String name, final InetAddress hostAddress, String port) {
|
||||
PortsRange portsRange = new PortsRange(port);
|
||||
final AtomicReference<Exception> lastException = new AtomicReference<>();
|
||||
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
|
||||
|
@ -485,48 +487,64 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get());
|
||||
}
|
||||
|
||||
InetSocketAddress boundAddress = boundSocket.get();
|
||||
// TODO: We can remove the special casing for the default profile and store it in the profile map to reduce the complexity here
|
||||
if (!DEFAULT_PROFILE.equals(name)) {
|
||||
// check to see if an address is already bound for this profile
|
||||
BoundTransportAddress boundTransportAddress = profileBoundAddresses().get(name);
|
||||
if (boundTransportAddress == null) {
|
||||
// no address is bound, so lets create one with the publish address information from the settings or the bound address as a fallback
|
||||
int publishPort = profileSettings.getAsInt("publish_port", boundAddress.getPort());
|
||||
String publishHosts[] = profileSettings.getAsArray("publish_host", new String[] { boundAddress.getHostString() });
|
||||
InetSocketAddress publishAddress = createPublishAddress(publishHosts, publishPort);
|
||||
profileBoundAddresses.put(name, new BoundTransportAddress(new TransportAddress[]{new InetSocketTransportAddress(boundAddress)}, new InetSocketTransportAddress(publishAddress)));
|
||||
} else {
|
||||
// TODO: support real multihoming with publishing. Today we update the bound addresses so only the prioritized address is published
|
||||
// an address already exists. add the new bound address to the end of a new array and create a new BoundTransportAddress with the array and existing publish address
|
||||
// the new bound address is appended in order to preserve the ordering/priority of bound addresses
|
||||
TransportAddress[] existingBoundAddress = boundTransportAddress.boundAddresses();
|
||||
TransportAddress[] updatedBoundAddresses = Arrays.copyOf(existingBoundAddress, existingBoundAddress.length + 1);
|
||||
updatedBoundAddresses[updatedBoundAddresses.length - 1] = new InetSocketTransportAddress(boundAddress);
|
||||
profileBoundAddresses.put(name, new BoundTransportAddress(updatedBoundAddresses, boundTransportAddress.publishAddress()));
|
||||
}
|
||||
} else {
|
||||
if (this.boundAddress == null) {
|
||||
// this is the first address that has been bound for the default profile so we get the publish address information and create a new BoundTransportAddress
|
||||
// these calls are different from the profile ones due to the way the settings for a profile are created. If we want to merge the code for the default profile and
|
||||
// other profiles together, we need to change how the profileSettings are built for the default profile...
|
||||
int publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", boundAddress.getPort()));
|
||||
String publishHosts[] = settings.getAsArray("transport.netty.publish_host", settings.getAsArray("transport.publish_host", settings.getAsArray("transport.host", null)));
|
||||
InetSocketAddress publishAddress = createPublishAddress(publishHosts, publishPort);
|
||||
this.boundAddress = new BoundTransportAddress(new TransportAddress[]{new InetSocketTransportAddress(boundAddress)}, new InetSocketTransportAddress(publishAddress));
|
||||
} else {
|
||||
// the default profile is already bound to one address and has the publish address, copy the existing bound addresses as is and append the new address.
|
||||
// the new bound address is appended in order to preserve the ordering/priority of bound addresses
|
||||
TransportAddress[] existingBoundAddress = this.boundAddress.boundAddresses();
|
||||
TransportAddress[] updatedBoundAddresses = Arrays.copyOf(existingBoundAddress, existingBoundAddress.length + 1);
|
||||
updatedBoundAddresses[updatedBoundAddresses.length - 1] = new InetSocketTransportAddress(boundAddress);
|
||||
this.boundAddress = new BoundTransportAddress(updatedBoundAddresses, this.boundAddress.publishAddress());
|
||||
}
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
|
||||
}
|
||||
|
||||
return boundSocket.get();
|
||||
}
|
||||
|
||||
private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings, List<InetSocketAddress> boundAddresses) {
|
||||
String[] boundAddressesHostStrings = new String[boundAddresses.size()];
|
||||
TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()];
|
||||
for (int i = 0; i < boundAddresses.size(); i++) {
|
||||
InetSocketAddress boundAddress = boundAddresses.get(i);
|
||||
boundAddressesHostStrings[i] = boundAddress.getHostString();
|
||||
transportBoundAddresses[i] = new InetSocketTransportAddress(boundAddress);
|
||||
}
|
||||
|
||||
final String[] publishHosts;
|
||||
if (DEFAULT_PROFILE.equals(name)) {
|
||||
publishHosts = settings.getAsArray("transport.netty.publish_host", settings.getAsArray("transport.publish_host", settings.getAsArray("transport.host", null)));
|
||||
} else {
|
||||
publishHosts = profileSettings.getAsArray("publish_host", boundAddressesHostStrings);
|
||||
}
|
||||
|
||||
final InetAddress publishInetAddress;
|
||||
try {
|
||||
publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts);
|
||||
} catch (Exception e) {
|
||||
throw new BindTransportException("Failed to resolve publish address", e);
|
||||
}
|
||||
|
||||
Integer publishPort;
|
||||
if (DEFAULT_PROFILE.equals(name)) {
|
||||
publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", null));
|
||||
} else {
|
||||
publishPort = profileSettings.getAsInt("publish_port", null);
|
||||
}
|
||||
|
||||
// if port not explicitly provided, search for port of address in boundAddresses that matches publishInetAddress
|
||||
if (publishPort == null) {
|
||||
for (InetSocketAddress boundAddress : boundAddresses) {
|
||||
InetAddress boundInetAddress = boundAddress.getAddress();
|
||||
if (boundInetAddress.isAnyLocalAddress() || boundInetAddress.equals(publishInetAddress)) {
|
||||
publishPort = boundAddress.getPort();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if port still not matches, just take port of first bound address
|
||||
if (publishPort == null) {
|
||||
// TODO: In case of DEFAULT_PROFILE we should probably fail here, as publish address does not match any bound address
|
||||
// In case of a custom profile, we might use the publish address of the default profile
|
||||
publishPort = boundAddresses.get(0).getPort();
|
||||
logger.warn("Publish port not found by matching publish address [{}] to bound addresses [{}], falling back to port [{}] of first bound address", publishInetAddress, boundAddresses, publishPort);
|
||||
}
|
||||
|
||||
final TransportAddress publishAddress = new InetSocketTransportAddress(new InetSocketAddress(publishInetAddress, publishPort));
|
||||
return new BoundTransportAddress(transportBoundAddresses, publishAddress);
|
||||
}
|
||||
|
||||
private void createServerBootstrap(String name, Settings settings) {
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
import org.elasticsearch.common.network.NetworkUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.transport.TransportModule;
|
||||
|
||||
import java.net.Inet4Address;
|
||||
import java.net.Inet6Address;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
/**
|
||||
* Checks that Elasticsearch produces a sane publish_address when it binds to
|
||||
* different ports on ipv4 and ipv6.
|
||||
*/
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
public class NettyTransportPublishAddressIT extends ESIntegTestCase {
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(TransportModule.TRANSPORT_TYPE_KEY, "netty")
|
||||
.put("node.mode", "network").build();
|
||||
}
|
||||
|
||||
public void testDifferentPorts() throws Exception {
|
||||
if (!NetworkUtils.SUPPORTS_V6) {
|
||||
return;
|
||||
}
|
||||
logger.info("--> starting a node on ipv4 only");
|
||||
Settings ipv4Settings = Settings.builder().put("network.host", "127.0.0.1").build();
|
||||
String ipv4OnlyNode = internalCluster().startNode(ipv4Settings); // should bind 127.0.0.1:XYZ
|
||||
|
||||
logger.info("--> starting a node on ipv4 and ipv6");
|
||||
Settings bothSettings = Settings.builder().put("network.host", "_local_").build();
|
||||
internalCluster().startNode(bothSettings); // should bind [::1]:XYZ and 127.0.0.1:XYZ+1
|
||||
|
||||
logger.info("--> waiting for the cluster to declare itself stable");
|
||||
ensureStableCluster(2); // fails if port of publish address does not match corresponding bound address
|
||||
|
||||
logger.info("--> checking if boundAddress matching publishAddress has same port");
|
||||
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().get();
|
||||
for (NodeInfo nodeInfo : nodesInfoResponse) {
|
||||
BoundTransportAddress boundTransportAddress = nodeInfo.getTransport().getAddress();
|
||||
if (nodeInfo.getNode().getName().equals(ipv4OnlyNode)) {
|
||||
assertThat(boundTransportAddress.boundAddresses().length, equalTo(1));
|
||||
assertThat(boundTransportAddress.boundAddresses()[0].getPort(), equalTo(boundTransportAddress.publishAddress().getPort()));
|
||||
} else {
|
||||
assertThat(boundTransportAddress.boundAddresses().length, greaterThan(1));
|
||||
for (TransportAddress boundAddress : boundTransportAddress.boundAddresses()) {
|
||||
assertThat(boundAddress, instanceOf(InetSocketTransportAddress.class));
|
||||
InetSocketTransportAddress inetBoundAddress = (InetSocketTransportAddress) boundAddress;
|
||||
if (inetBoundAddress.address().getAddress() instanceof Inet4Address) {
|
||||
// IPv4 address is preferred publish address for _local_
|
||||
assertThat(inetBoundAddress.getPort(), equalTo(boundTransportAddress.publishAddress().getPort()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue