From 1902c66aaf95d88d6ec78e49bb3f73f7ad6c2ee9 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 4 Nov 2015 23:11:03 +0100 Subject: [PATCH] Port of publishAddress should match port of corresponding boundAddress Closes #14535 Closes #14503 Closes #14514 Closes #14513 Closes #14476 --- .../elasticsearch/http/BindHttpException.java | 4 + .../http/netty/NettyHttpServerTransport.java | 24 +++- .../transport/netty/NettyTransport.java | 120 ++++++++++-------- .../netty/NettyTransportPublishAddressIT.java | 90 +++++++++++++ 4 files changed, 181 insertions(+), 57 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/transport/netty/NettyTransportPublishAddressIT.java diff --git a/core/src/main/java/org/elasticsearch/http/BindHttpException.java b/core/src/main/java/org/elasticsearch/http/BindHttpException.java index ebfc7e768eb..f8cac9f2a97 100644 --- a/core/src/main/java/org/elasticsearch/http/BindHttpException.java +++ b/core/src/main/java/org/elasticsearch/http/BindHttpException.java @@ -28,6 +28,10 @@ import java.io.IOException; */ public class BindHttpException extends HttpException { + public BindHttpException(String message) { + super(message); + } + public BindHttpException(String message, Throwable cause) { super(message, cause); } diff --git a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index 3dbe3bf0df6..c6e9fbbcc8f 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -257,16 +257,28 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent implem return unmodifiableMap(new HashMap<>(profileBoundAddresses)); } - private InetSocketAddress createPublishAddress(String publishHosts[], int publishPort) { - try { - return new InetSocketAddress(networkService.resolvePublishHostAddresses(publishHosts), publishPort); - } catch (Exception e) { - throw new BindTransportException("Failed to resolve publish address", e); - } - } - private ClientBootstrap createClientBootstrap() { if (blockingClient) { @@ -449,14 +441,24 @@ public class NettyTransport extends AbstractLifecycleComponent implem } logger.debug("binding server bootstrap to: {}", (Object)addresses); } + + assert hostAddresses.length > 0; + + List boundAddresses = new ArrayList<>(); for (InetAddress hostAddress : hostAddresses) { - bindServerBootstrap(name, hostAddress, settings); + boundAddresses.add(bindToPort(name, hostAddress, settings.get("port"))); + } + + final BoundTransportAddress boundTransportAddress = createBoundTransportAddress(name, settings, boundAddresses); + + if (DEFAULT_PROFILE.equals(name)) { + this.boundAddress = boundTransportAddress; + } else { + profileBoundAddresses.put(name, boundTransportAddress); } } - private void bindServerBootstrap(final String name, final InetAddress hostAddress, Settings profileSettings) { - - String port = profileSettings.get("port"); + private InetSocketAddress bindToPort(final String name, final InetAddress hostAddress, String port) { PortsRange portsRange = new PortsRange(port); final AtomicReference lastException = new AtomicReference<>(); final AtomicReference boundSocket = new AtomicReference<>(); @@ -485,48 +487,64 @@ public class NettyTransport extends AbstractLifecycleComponent implem throw new BindTransportException("Failed to bind to [" + port + "]", lastException.get()); } - InetSocketAddress boundAddress = boundSocket.get(); - // TODO: We can remove the special casing for the default profile and store it in the profile map to reduce the complexity here - if (!DEFAULT_PROFILE.equals(name)) { - // check to see if an address is already bound for this profile - BoundTransportAddress boundTransportAddress = profileBoundAddresses().get(name); - if (boundTransportAddress == null) { - // no address is bound, so lets create one with the publish address information from the settings or the bound address as a fallback - int publishPort = profileSettings.getAsInt("publish_port", boundAddress.getPort()); - String publishHosts[] = profileSettings.getAsArray("publish_host", new String[] { boundAddress.getHostString() }); - InetSocketAddress publishAddress = createPublishAddress(publishHosts, publishPort); - profileBoundAddresses.put(name, new BoundTransportAddress(new TransportAddress[]{new InetSocketTransportAddress(boundAddress)}, new InetSocketTransportAddress(publishAddress))); - } else { - // TODO: support real multihoming with publishing. Today we update the bound addresses so only the prioritized address is published - // an address already exists. add the new bound address to the end of a new array and create a new BoundTransportAddress with the array and existing publish address - // the new bound address is appended in order to preserve the ordering/priority of bound addresses - TransportAddress[] existingBoundAddress = boundTransportAddress.boundAddresses(); - TransportAddress[] updatedBoundAddresses = Arrays.copyOf(existingBoundAddress, existingBoundAddress.length + 1); - updatedBoundAddresses[updatedBoundAddresses.length - 1] = new InetSocketTransportAddress(boundAddress); - profileBoundAddresses.put(name, new BoundTransportAddress(updatedBoundAddresses, boundTransportAddress.publishAddress())); - } - } else { - if (this.boundAddress == null) { - // this is the first address that has been bound for the default profile so we get the publish address information and create a new BoundTransportAddress - // these calls are different from the profile ones due to the way the settings for a profile are created. If we want to merge the code for the default profile and - // other profiles together, we need to change how the profileSettings are built for the default profile... - int publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", boundAddress.getPort())); - String publishHosts[] = settings.getAsArray("transport.netty.publish_host", settings.getAsArray("transport.publish_host", settings.getAsArray("transport.host", null))); - InetSocketAddress publishAddress = createPublishAddress(publishHosts, publishPort); - this.boundAddress = new BoundTransportAddress(new TransportAddress[]{new InetSocketTransportAddress(boundAddress)}, new InetSocketTransportAddress(publishAddress)); - } else { - // the default profile is already bound to one address and has the publish address, copy the existing bound addresses as is and append the new address. - // the new bound address is appended in order to preserve the ordering/priority of bound addresses - TransportAddress[] existingBoundAddress = this.boundAddress.boundAddresses(); - TransportAddress[] updatedBoundAddresses = Arrays.copyOf(existingBoundAddress, existingBoundAddress.length + 1); - updatedBoundAddresses[updatedBoundAddresses.length - 1] = new InetSocketTransportAddress(boundAddress); - this.boundAddress = new BoundTransportAddress(updatedBoundAddresses, this.boundAddress.publishAddress()); - } - } - if (logger.isDebugEnabled()) { logger.debug("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get())); } + + return boundSocket.get(); + } + + private BoundTransportAddress createBoundTransportAddress(String name, Settings profileSettings, List boundAddresses) { + String[] boundAddressesHostStrings = new String[boundAddresses.size()]; + TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()]; + for (int i = 0; i < boundAddresses.size(); i++) { + InetSocketAddress boundAddress = boundAddresses.get(i); + boundAddressesHostStrings[i] = boundAddress.getHostString(); + transportBoundAddresses[i] = new InetSocketTransportAddress(boundAddress); + } + + final String[] publishHosts; + if (DEFAULT_PROFILE.equals(name)) { + publishHosts = settings.getAsArray("transport.netty.publish_host", settings.getAsArray("transport.publish_host", settings.getAsArray("transport.host", null))); + } else { + publishHosts = profileSettings.getAsArray("publish_host", boundAddressesHostStrings); + } + + final InetAddress publishInetAddress; + try { + publishInetAddress = networkService.resolvePublishHostAddresses(publishHosts); + } catch (Exception e) { + throw new BindTransportException("Failed to resolve publish address", e); + } + + Integer publishPort; + if (DEFAULT_PROFILE.equals(name)) { + publishPort = settings.getAsInt("transport.netty.publish_port", settings.getAsInt("transport.publish_port", null)); + } else { + publishPort = profileSettings.getAsInt("publish_port", null); + } + + // if port not explicitly provided, search for port of address in boundAddresses that matches publishInetAddress + if (publishPort == null) { + for (InetSocketAddress boundAddress : boundAddresses) { + InetAddress boundInetAddress = boundAddress.getAddress(); + if (boundInetAddress.isAnyLocalAddress() || boundInetAddress.equals(publishInetAddress)) { + publishPort = boundAddress.getPort(); + break; + } + } + } + + // if port still not matches, just take port of first bound address + if (publishPort == null) { + // TODO: In case of DEFAULT_PROFILE we should probably fail here, as publish address does not match any bound address + // In case of a custom profile, we might use the publish address of the default profile + publishPort = boundAddresses.get(0).getPort(); + logger.warn("Publish port not found by matching publish address [{}] to bound addresses [{}], falling back to port [{}] of first bound address", publishInetAddress, boundAddresses, publishPort); + } + + final TransportAddress publishAddress = new InetSocketTransportAddress(new InetSocketAddress(publishInetAddress, publishPort)); + return new BoundTransportAddress(transportBoundAddresses, publishAddress); } private void createServerBootstrap(String name, Settings settings) { diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportPublishAddressIT.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportPublishAddressIT.java new file mode 100644 index 00000000000..3437701f6c9 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportPublishAddressIT.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.netty; + +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.common.network.NetworkUtils; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.TransportModule; + +import java.net.Inet4Address; +import java.net.Inet6Address; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; + +/** + * Checks that Elasticsearch produces a sane publish_address when it binds to + * different ports on ipv4 and ipv6. + */ +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class NettyTransportPublishAddressIT extends ESIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(TransportModule.TRANSPORT_TYPE_KEY, "netty") + .put("node.mode", "network").build(); + } + + public void testDifferentPorts() throws Exception { + if (!NetworkUtils.SUPPORTS_V6) { + return; + } + logger.info("--> starting a node on ipv4 only"); + Settings ipv4Settings = Settings.builder().put("network.host", "127.0.0.1").build(); + String ipv4OnlyNode = internalCluster().startNode(ipv4Settings); // should bind 127.0.0.1:XYZ + + logger.info("--> starting a node on ipv4 and ipv6"); + Settings bothSettings = Settings.builder().put("network.host", "_local_").build(); + internalCluster().startNode(bothSettings); // should bind [::1]:XYZ and 127.0.0.1:XYZ+1 + + logger.info("--> waiting for the cluster to declare itself stable"); + ensureStableCluster(2); // fails if port of publish address does not match corresponding bound address + + logger.info("--> checking if boundAddress matching publishAddress has same port"); + NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().get(); + for (NodeInfo nodeInfo : nodesInfoResponse) { + BoundTransportAddress boundTransportAddress = nodeInfo.getTransport().getAddress(); + if (nodeInfo.getNode().getName().equals(ipv4OnlyNode)) { + assertThat(boundTransportAddress.boundAddresses().length, equalTo(1)); + assertThat(boundTransportAddress.boundAddresses()[0].getPort(), equalTo(boundTransportAddress.publishAddress().getPort())); + } else { + assertThat(boundTransportAddress.boundAddresses().length, greaterThan(1)); + for (TransportAddress boundAddress : boundTransportAddress.boundAddresses()) { + assertThat(boundAddress, instanceOf(InetSocketTransportAddress.class)); + InetSocketTransportAddress inetBoundAddress = (InetSocketTransportAddress) boundAddress; + if (inetBoundAddress.address().getAddress() instanceof Inet4Address) { + // IPv4 address is preferred publish address for _local_ + assertThat(inetBoundAddress.getPort(), equalTo(boundTransportAddress.publishAddress().getPort())); + } + } + } + } + } +}