diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index cc5ff81e8d8..4d8229e514c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -360,16 +360,16 @@ public class DiscoveryNode implements Streamable, ToXContent { public String toString() { StringBuilder sb = new StringBuilder(); if (nodeName.length() > 0) { - sb.append('[').append(nodeName).append(']'); + sb.append('{').append(nodeName).append('}'); } if (nodeId != null) { - sb.append('[').append(nodeId).append(']'); + sb.append('{').append(nodeId).append('}'); } if (Strings.hasLength(hostName)) { - sb.append('[').append(hostName).append(']'); + sb.append('{').append(hostName).append('}'); } if (address != null) { - sb.append('[').append(address).append(']'); + sb.append('{').append(address).append('}'); } if (!attributes.isEmpty()) { sb.append(attributes); diff --git a/core/src/main/java/org/elasticsearch/common/logging/Loggers.java b/core/src/main/java/org/elasticsearch/common/logging/Loggers.java index de657c07be2..93d4cc3d111 100644 --- a/core/src/main/java/org/elasticsearch/common/logging/Loggers.java +++ b/core/src/main/java/org/elasticsearch/common/logging/Loggers.java @@ -84,6 +84,7 @@ public class Loggers { } } + @SuppressForbidden(reason = "do not know what this method does") public static ESLogger getLogger(String loggerName, Settings settings, String... prefixes) { List prefixesList = newArrayList(); if (settings.getAsBoolean("logger.logHostAddress", false)) { diff --git a/core/src/main/java/org/elasticsearch/common/network/IfConfig.java b/core/src/main/java/org/elasticsearch/common/network/IfConfig.java index 6603ab52ce6..ea33274fad1 100644 --- a/core/src/main/java/org/elasticsearch/common/network/IfConfig.java +++ b/core/src/main/java/org/elasticsearch/common/network/IfConfig.java @@ -114,22 +114,22 @@ final class IfConfig { InetAddress address = interfaceAddress.getAddress(); if (address instanceof Inet6Address) { sb.append("inet6 "); - sb.append(address.toString().substring(1)); + sb.append(NetworkAddress.formatAddress(address)); sb.append(" prefixlen:"); sb.append(interfaceAddress.getNetworkPrefixLength()); } else { sb.append("inet "); - sb.append(address.toString().substring(1)); + sb.append(NetworkAddress.formatAddress(address)); int netmask = 0xFFFFFFFF << (32 - interfaceAddress.getNetworkPrefixLength()); - sb.append(" netmask:" + InetAddress.getByAddress(new byte[] { + sb.append(" netmask:" + NetworkAddress.formatAddress(InetAddress.getByAddress(new byte[] { (byte)(netmask >>> 24), (byte)(netmask >>> 16 & 0xFF), (byte)(netmask >>> 8 & 0xFF), (byte)(netmask & 0xFF) - }).toString().substring(1)); + }))); InetAddress broadcast = interfaceAddress.getBroadcast(); if (broadcast != null) { - sb.append(" broadcast:" + broadcast.toString().substring(1)); + sb.append(" broadcast:" + NetworkAddress.formatAddress(broadcast)); } } if (address.isLoopbackAddress()) { diff --git a/core/src/main/java/org/elasticsearch/common/network/MulticastChannel.java b/core/src/main/java/org/elasticsearch/common/network/MulticastChannel.java index 73d4e305ab9..7932cb3397c 100644 --- a/core/src/main/java/org/elasticsearch/common/network/MulticastChannel.java +++ b/core/src/main/java/org/elasticsearch/common/network/MulticastChannel.java @@ -20,7 +20,9 @@ package org.elasticsearch.common.network; import com.google.common.collect.Maps; + import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.ESLogger; @@ -257,6 +259,7 @@ public abstract class MulticastChannel implements Closeable { /** * Simple implementation of a channel. */ + @SuppressForbidden(reason = "I bind to wildcard addresses. I am a total nightmare") private static class Plain extends MulticastChannel { private final ESLogger logger; private final Config config; diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkAddress.java b/core/src/main/java/org/elasticsearch/common/network/NetworkAddress.java new file mode 100644 index 00000000000..91eda6bb624 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/network/NetworkAddress.java @@ -0,0 +1,183 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.network; + +import com.google.common.net.InetAddresses; + +import org.elasticsearch.common.SuppressForbidden; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Objects; + +/** + * Utility functions for presentation of network addresses. + *

+ * Java's address formatting is particularly bad, every address + * has an optional host if its resolved, so IPv4 addresses often + * look like this (note the confusing leading slash): + *

+ *    {@code /127.0.0.1}
+ * 
+ * IPv6 addresses are even worse, with no IPv6 address compression, + * and often containing things like numeric scopeids, which are even + * more confusing (e.g. not going to work in any user's browser, refer + * to an interface on another machine, etc): + *
+ *    {@code /0:0:0:0:0:0:0:1%1}
+ * 
+ * This class provides sane address formatting instead, e.g. + * {@code 127.0.0.1} and {@code ::1} respectively. No methods do reverse + * lookups. + */ +public final class NetworkAddress { + /** No instantiation */ + private NetworkAddress() {} + + /** + * Formats a network address (with optional host) for display purposes. + *

+ * If the host is already resolved (typically because, we looked up + * a name to do that), then we include it, otherwise it is + * omitted. See {@link #formatAddress(InetAddress)} if you only + * want the address. + *

+ * IPv6 addresses are compressed and without scope + * identifiers. + *

+ * Example output with already-resolved hostnames: + *

+ *

+ * Example output with just an address: + *

+ * @param address IPv4 or IPv6 address + * @return formatted string + * @see #formatAddress(InetAddress) + */ + public static String format(InetAddress address) { + return format(address, -1, true); + } + + /** + * Formats a network address and port for display purposes. + *

+ * If the host is already resolved (typically because, we looked up + * a name to do that), then we include it, otherwise it is + * omitted. See {@link #formatAddress(InetSocketAddress)} if you only + * want the address. + *

+ * This formats the address with {@link #format(InetAddress)} + * and appends the port number. IPv6 addresses will be bracketed. + *

+ * Example output with already-resolved hostnames: + *

+ *

+ * Example output with just an address: + *

+ * @param address IPv4 or IPv6 address with port + * @return formatted string + * @see #formatAddress(InetSocketAddress) + */ + public static String format(InetSocketAddress address) { + return format(address.getAddress(), address.getPort(), true); + } + + /** + * Formats a network address for display purposes. + *

+ * This formats only the address, any hostname information, + * if present, is ignored. IPv6 addresses are compressed + * and without scope identifiers. + *

+ * Example output with just an address: + *

+ * @param address IPv4 or IPv6 address + * @return formatted string + */ + public static String formatAddress(InetAddress address) { + return format(address, -1, false); + } + + /** + * Formats a network address and port for display purposes. + *

+ * This formats the address with {@link #formatAddress(InetAddress)} + * and appends the port number. IPv6 addresses will be bracketed. + * Any host information, if present is ignored. + *

+ * Example output: + *

+ * @param address IPv4 or IPv6 address with port + * @return formatted string + */ + public static String formatAddress(InetSocketAddress address) { + return format(address.getAddress(), address.getPort(), false); + } + + // note, we don't validate port, because we only allow InetSocketAddress + @SuppressForbidden(reason = "we call toString to avoid a DNS lookup") + static String format(InetAddress address, int port, boolean includeHost) { + Objects.requireNonNull(address); + + StringBuilder builder = new StringBuilder(); + + if (includeHost) { + // must use toString, to avoid DNS lookup. but the format is specified in the spec + String toString = address.toString(); + int separator = toString.indexOf('/'); + if (separator > 0) { + // append hostname, with the slash too + builder.append(toString, 0, separator + 1); + } + } + + if (port != -1 && address instanceof Inet6Address) { + builder.append(InetAddresses.toUriString(address)); + } else { + builder.append(InetAddresses.toAddrString(address)); + } + + if (port != -1) { + builder.append(':'); + builder.append(port); + } + + return builder.toString(); + } +} diff --git a/core/src/main/java/org/elasticsearch/common/transport/InetSocketTransportAddress.java b/core/src/main/java/org/elasticsearch/common/transport/InetSocketTransportAddress.java index 07db52b4fbe..29c3f2560fa 100644 --- a/core/src/main/java/org/elasticsearch/common/transport/InetSocketTransportAddress.java +++ b/core/src/main/java/org/elasticsearch/common/transport/InetSocketTransportAddress.java @@ -19,8 +19,10 @@ package org.elasticsearch.common.transport; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.network.NetworkAddress; import java.io.IOException; import java.net.Inet6Address; @@ -32,6 +34,7 @@ import java.net.InetSocketAddress; */ public final class InetSocketTransportAddress implements TransportAddress { + // TODO: do we really need this option, why do resolving? - remove this as a follow-up private static boolean resolveAddress = false; public static void setResolveAddress(boolean resolveAddress) { @@ -53,15 +56,14 @@ public final class InetSocketTransportAddress implements TransportAddress { in.readFully(a); InetAddress inetAddress; if (len == 16) { - int scope_id = in.readInt(); - inetAddress = Inet6Address.getByAddress(null, a, scope_id); + inetAddress = Inet6Address.getByAddress(null, a); } else { inetAddress = InetAddress.getByAddress(a); } int port = in.readInt(); this.address = new InetSocketAddress(inetAddress, port); } else { - this.address = new InetSocketAddress(in.readString(), in.readInt()); + this.address = new InetSocketAddress(InetAddress.getByName(in.readString()), in.readInt()); } } @@ -69,10 +71,6 @@ public final class InetSocketTransportAddress implements TransportAddress { address = null; } - public InetSocketTransportAddress(String hostname, int port) { - this(new InetSocketAddress(hostname, port)); - } - public InetSocketTransportAddress(InetAddress address, int port) { this(new InetSocketAddress(address, port)); } @@ -94,16 +92,12 @@ public final class InetSocketTransportAddress implements TransportAddress { @Override public String getHost() { - if (resolveAddress) { - return address.getHostName(); - } else { - return getAddress(); - } + return maybeLookupHostname(); } @Override public String getAddress() { - return address.getAddress().getHostAddress(); + return NetworkAddress.formatAddress(address.getAddress()); } @Override @@ -127,15 +121,25 @@ public final class InetSocketTransportAddress implements TransportAddress { byte[] bytes = address().getAddress().getAddress(); // 4 bytes (IPv4) or 16 bytes (IPv6) out.writeByte((byte) bytes.length); // 1 byte out.write(bytes, 0, bytes.length); - if (address().getAddress() instanceof Inet6Address) - out.writeInt(((Inet6Address) address.getAddress()).getScopeId()); + // don't serialize scope ids over the network!!!! + // these only make sense with respect to the local machine, and will only formulate + // the address incorrectly remotely. } else { out.writeByte((byte) 1); - out.writeString(address.getHostName()); + out.writeString(maybeLookupHostname()); } out.writeInt(address.getPort()); } + @SuppressForbidden(reason = "if explicitly configured we do hostName reverse lookup") // TODO remove this? + private String maybeLookupHostname() { + if (resolveAddress) { + return address.getHostName(); + } else { + return getAddress(); + } + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -151,6 +155,6 @@ public final class InetSocketTransportAddress implements TransportAddress { @Override public String toString() { - return "inet[" + address + "]"; + return NetworkAddress.format(address); } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java index 00fb59e94ba..580f87812e1 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java @@ -53,10 +53,10 @@ public class ZenPingService extends AbstractLifecycleComponent implemen Version version, ElectMasterService electMasterService, @Nullable Set unicastHostsProviders) { super(settings); ImmutableList.Builder zenPingsBuilder = ImmutableList.builder(); - if (this.settings.getAsBoolean("discovery.zen.ping.multicast.enabled", true)) { + if (this.settings.getAsBoolean("discovery.zen.ping.multicast.enabled", false)) { zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService, version)); } - // always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast + // always add the unicast hosts, or things get angry! zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, electMasterService, unicastHostsProviders)); this.zenPings = zenPingsBuilder.build(); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index b16b616515c..24ee0f12824 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -64,7 +64,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen public static final String ACTION_NAME = "internal:discovery/zen/unicast"; - public static final int LIMIT_PORTS_COUNT = 1; + // these limits are per-address + public static final int LIMIT_FOREIGN_PORTS_COUNT = 1; + public static final int LIMIT_LOCAL_PORTS_COUNT = 5; + private final ThreadPool threadPool; private final TransportService transportService; @@ -117,15 +120,24 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen hostArr[i] = hostArr[i].trim(); } List hosts = Lists.newArrayList(hostArr); + final int limitPortCounts; + if (hosts.isEmpty()) { + // if unicast hosts are not specified, fill with simple defaults on the local machine + limitPortCounts = LIMIT_LOCAL_PORTS_COUNT; + hosts.addAll(transportService.getLocalAddresses()); + } else { + // we only limit to 1 addresses, makes no sense to ping 100 ports + limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; + } + logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects); List configuredTargetNodes = Lists.newArrayList(); for (String host : hosts) { try { - TransportAddress[] addresses = transportService.addressesFromString(host); - // we only limit to 1 addresses, makes no sense to ping 100 ports - for (int i = 0; (i < addresses.length && i < LIMIT_PORTS_COUNT); i++) { - configuredTargetNodes.add(new DiscoveryNode(UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", addresses[i], version.minimumCompatibilityVersion())); + TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts); + for (TransportAddress address : addresses) { + configuredTargetNodes.add(new DiscoveryNode(UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", address, version.minimumCompatibilityVersion())); } } catch (Exception e) { throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e); diff --git a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index 664f7a8d0e4..fa0c1a9fb28 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.netty.NettyUtils; import org.elasticsearch.common.netty.OpenChannelsHandler; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; @@ -274,7 +275,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent lastException = new AtomicReference<>(); - final AtomicReference boundSocket = new AtomicReference<>(); + final AtomicReference boundSocket = new AtomicReference<>(); boolean success = portsRange.iterate(new PortsRange.PortCallback() { @Override public boolean onPortNumber(int portNumber) { @@ -282,7 +283,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent { + public static class TransportSettings { public static final String TRANSPORT_TCP_COMPRESS = "transport.tcp.compress"; } @@ -52,7 +54,7 @@ public interface Transport extends LifecycleComponent { /** * Returns an address from its string representation. */ - TransportAddress[] addressesFromString(String address) throws Exception; + TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception; /** * Is the address type supported. @@ -89,4 +91,6 @@ public interface Transport extends LifecycleComponent { * Returns count of currently open connections */ long serverOpen(); + + List getLocalAddresses(); } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportModule.java b/core/src/main/java/org/elasticsearch/transport/TransportModule.java index fd8932b5760..0be84037700 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportModule.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportModule.java @@ -92,7 +92,6 @@ public class TransportModule extends AbstractModule { } bind(NamedWriteableRegistry.class).asEagerSingleton(); - if (configuredTransport != null) { logger.info("Using [{}] as transport, overridden by [{}]", configuredTransport.getName(), configuredTransportSource); bind(Transport.class).to(configuredTransport).asEagerSingleton(); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 81599996a6f..b70589ce52d 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -40,10 +40,7 @@ import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; @@ -221,6 +218,10 @@ public class TransportService extends AbstractLifecycleComponent getLocalAddresses() { + return transport.getLocalAddresses(); + } + public boolean nodeConnected(DiscoveryNode node) { return node.equals(localNode) || transport.nodeConnected(node); } @@ -383,8 +384,8 @@ public class TransportService extends AbstractLifecycleComponent implements Transport { public static final String LOCAL_TRANSPORT_THREAD_NAME_PREFIX = "local_transport"; - private final ThreadPool threadPool; private final ThreadPoolExecutor workers; private final Version version; private volatile TransportServiceAdapter transportServiceAdapter; private volatile BoundTransportAddress boundAddress; private volatile LocalTransportAddress localAddress; - private final static ConcurrentMap transports = newConcurrentMap(); + private final static ConcurrentMap transports = newConcurrentMap(); private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); private final ConcurrentMap connectedNodes = newConcurrentMap(); private final NamedWriteableRegistry namedWriteableRegistry; @@ -78,7 +76,6 @@ public class LocalTransport extends AbstractLifecycleComponent implem super(settings); this.threadPool = threadPool; this.version = version; - int workerCount = this.settings.getAsInt(TRANSPORT_LOCAL_WORKERS, EsExecutors.boundedNumberOfProcessors(settings)); int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1); logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize); @@ -88,7 +85,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem } @Override - public TransportAddress[] addressesFromString(String address) { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) { return new TransportAddress[]{new LocalTransportAddress(address)}; } @@ -359,4 +356,9 @@ public class LocalTransport extends AbstractLifecycleComponent implem logger.error("failed to handle exception response [{}]", t, handler); } } + + @Override + public List getLocalAddresses() { + return Collections.singletonList("0.0.0.0"); + } } diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 372c3b42cb6..5d185fc346e 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -22,8 +22,8 @@ package org.elasticsearch.transport.netty; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; + import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -42,6 +42,7 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.netty.NettyUtils; import org.elasticsearch.common.netty.OpenChannelsHandler; import org.elasticsearch.common.netty.ReleaseChannelFutureListener; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; @@ -75,6 +76,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.UnknownHostException; import java.nio.channels.CancelledKeyException; import java.util.*; import java.util.concurrent.*; @@ -82,6 +84,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.elasticsearch.common.network.NetworkService.TcpSettings.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -118,6 +122,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem public static final String DEFAULT_PORT_RANGE = "9300-9400"; public static final String DEFAULT_PROFILE = "default"; + private static final List LOCAL_ADDRESSES = Arrays.asList("127.0.0.1", "[::1]"); protected final NetworkService networkService; protected final Version version; @@ -405,7 +410,11 @@ public class NettyTransport extends AbstractLifecycleComponent implem throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e); } if (logger.isDebugEnabled()) { - logger.debug("binding server bootstrap to: {}", hostAddresses); + String[] addresses = new String[hostAddresses.length]; + for (int i = 0; i < hostAddresses.length; i++) { + addresses[i] = NetworkAddress.format(hostAddresses[i]); + } + logger.debug("binding server bootstrap to: {}", addresses); } for (InetAddress hostAddress : hostAddresses) { bindServerBootstrap(name, hostAddress, settings); @@ -417,7 +426,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem String port = settings.get("port"); PortsRange portsRange = new PortsRange(port); final AtomicReference lastException = new AtomicReference<>(); - final AtomicReference boundSocket = new AtomicReference<>(); + final AtomicReference boundSocket = new AtomicReference<>(); boolean success = portsRange.iterate(new PortsRange.PortCallback() { @Override public boolean onPortNumber(int portNumber) { @@ -430,7 +439,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem serverChannels.put(name, list); } list.add(channel); - boundSocket.set(channel.getLocalAddress()); + boundSocket.set((InetSocketAddress)channel.getLocalAddress()); } } catch (Exception e) { lastException.set(e); @@ -444,7 +453,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem } if (!DEFAULT_PROFILE.equals(name)) { - InetSocketAddress boundAddress = (InetSocketAddress) boundSocket.get(); + InetSocketAddress boundAddress = boundSocket.get(); int publishPort = settings.getAsInt("publish_port", boundAddress.getPort()); String publishHost = settings.get("publish_host", boundAddress.getHostString()); InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort); @@ -452,7 +461,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem profileBoundAddresses.putIfAbsent(name, new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress))); } - logger.info("Bound profile [{}] to address [{}]", name, boundSocket.get()); + logger.info("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get())); } private void createServerBootstrap(String name, Settings settings) { @@ -582,35 +591,65 @@ public class NettyTransport extends AbstractLifecycleComponent implem } @Override - public TransportAddress[] addressesFromString(String address) throws Exception { - int index = address.indexOf('['); - if (index != -1) { - String host = address.substring(0, index); - Set ports = Strings.commaDelimitedListToSet(address.substring(index + 1, address.indexOf(']'))); - List addresses = Lists.newArrayList(); - for (String port : ports) { - int[] iPorts = new PortsRange(port).ports(); - for (int iPort : iPorts) { - addresses.add(new InetSocketTransportAddress(host, iPort)); - } - } - return addresses.toArray(new TransportAddress[addresses.size()]); + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + return parse(address, settings.get("transport.profiles.default.port", + settings.get("transport.netty.port", + settings.get("transport.tcp.port", + DEFAULT_PORT_RANGE))), perAddressLimit); + } + + // this code is a take on guava's HostAndPort, like a HostAndPortRange + + // pattern for validating ipv6 bracked addresses. + // not perfect, but PortsRange should take care of any port range validation, not a regex + private static final Pattern BRACKET_PATTERN = Pattern.compile("^\\[(.*:.*)\\](?::([\\d\\-]*))?$"); + + /** parse a hostname+port range spec into its equivalent addresses */ + static TransportAddress[] parse(String hostPortString, String defaultPortRange, int perAddressLimit) throws UnknownHostException { + Objects.requireNonNull(hostPortString); + String host; + String portString = null; + + if (hostPortString.startsWith("[")) { + // Parse a bracketed host, typically an IPv6 literal. + Matcher matcher = BRACKET_PATTERN.matcher(hostPortString); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid bracketed host/port range: " + hostPortString); + } + host = matcher.group(1); + portString = matcher.group(2); // could be null } else { - index = address.lastIndexOf(':'); - if (index == -1) { - List addresses = Lists.newArrayList(); - String defaultPort = settings.get("transport.profiles.default.port", settings.get("transport.netty.port", this.settings.get("transport.tcp.port", DEFAULT_PORT_RANGE))); - int[] iPorts = new PortsRange(defaultPort).ports(); - for (int iPort : iPorts) { - addresses.add(new InetSocketTransportAddress(address, iPort)); - } - return addresses.toArray(new TransportAddress[addresses.size()]); - } else { - String host = address.substring(0, index); - int port = Integer.parseInt(address.substring(index + 1)); - return new TransportAddress[]{new InetSocketTransportAddress(host, port)}; + int colonPos = hostPortString.indexOf(':'); + if (colonPos >= 0 && hostPortString.indexOf(':', colonPos + 1) == -1) { + // Exactly 1 colon. Split into host:port. + host = hostPortString.substring(0, colonPos); + portString = hostPortString.substring(colonPos + 1); + } else { + // 0 or 2+ colons. Bare hostname or IPv6 literal. + host = hostPortString; + // 2+ colons and not bracketed: exception + if (colonPos >= 0) { + throw new IllegalArgumentException("IPv6 addresses must be bracketed: " + hostPortString); + } + } + } + + // if port isn't specified, fill with the default + if (portString == null || portString.isEmpty()) { + portString = defaultPortRange; + } + + // generate address for each port in the range + Set addresses = new HashSet<>(Arrays.asList(InetAddress.getAllByName(host))); + List transportAddresses = new ArrayList<>(); + int[] ports = new PortsRange(portString).ports(); + int limit = Math.min(ports.length, perAddressLimit); + for (int i = 0; i < limit; i++) { + for (InetAddress address : addresses) { + transportAddresses.add(new InetSocketTransportAddress(address, ports[i])); } } + return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]); } @Override @@ -673,6 +712,11 @@ public class NettyTransport extends AbstractLifecycleComponent implem return channels == null ? 0 : channels.numberOfOpenChannels(); } + @Override + public List getLocalAddresses() { + return LOCAL_ADDRESSES; + } + @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { diff --git a/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index 7f3ce810c17..84f53d15f46 100644 --- a/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -35,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import org.elasticsearch.transport.netty.NettyTransport; +import java.net.InetAddress; import java.util.concurrent.CountDownLatch; import static org.elasticsearch.transport.TransportRequestOptions.options; @@ -44,7 +45,7 @@ import static org.elasticsearch.transport.TransportRequestOptions.options; */ public class BenchmarkNettyLargeMessages { - public static void main(String[] args) throws InterruptedException { + public static void main(String[] args) throws Exception { final ByteSizeValue payloadSize = new ByteSizeValue(10, ByteSizeUnit.MB); final int NUMBER_OF_ITERATIONS = 100000; final int NUMBER_OF_CLIENTS = 5; @@ -63,7 +64,7 @@ public class BenchmarkNettyLargeMessages { new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()), threadPool ).start(); - final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300), Version.CURRENT); + final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300), Version.CURRENT); // final DiscoveryNode smallNode = new DiscoveryNode("small", new InetSocketTransportAddress("localhost", 9300)); final DiscoveryNode smallNode = bigNode; diff --git a/core/src/test/java/org/elasticsearch/benchmark/transport/netty/NettyEchoBenchmark.java b/core/src/test/java/org/elasticsearch/benchmark/transport/netty/NettyEchoBenchmark.java index 61686ebb7d5..fd76504f2cb 100644 --- a/core/src/test/java/org/elasticsearch/benchmark/transport/netty/NettyEchoBenchmark.java +++ b/core/src/test/java/org/elasticsearch/benchmark/transport/netty/NettyEchoBenchmark.java @@ -27,13 +27,14 @@ import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; public class NettyEchoBenchmark { - public static void main(String[] args) { + public static void main(String[] args) throws Exception { final int payloadSize = 100; int CYCLE_SIZE = 50000; final long NUMBER_OF_ITERATIONS = 500000; @@ -58,7 +59,7 @@ public class NettyEchoBenchmark { }); // Bind and start to accept incoming connections. - serverBootstrap.bind(new InetSocketAddress(9000)); + serverBootstrap.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 9000)); ClientBootstrap clientBootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( @@ -78,7 +79,7 @@ public class NettyEchoBenchmark { }); // Start the connection attempt. - ChannelFuture future = clientBootstrap.connect(new InetSocketAddress("localhost", 9000)); + ChannelFuture future = clientBootstrap.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), 9000)); future.awaitUninterruptibly(); Channel clientChannel = future.getChannel(); diff --git a/core/src/test/java/org/elasticsearch/bwcompat/UnicastBackwardsCompatibilityIT.java b/core/src/test/java/org/elasticsearch/bwcompat/UnicastBackwardsCompatibilityIT.java index f90eae446d5..59dd6695218 100644 --- a/core/src/test/java/org/elasticsearch/bwcompat/UnicastBackwardsCompatibilityIT.java +++ b/core/src/test/java/org/elasticsearch/bwcompat/UnicastBackwardsCompatibilityIT.java @@ -33,7 +33,6 @@ public class UnicastBackwardsCompatibilityIT extends ESBackcompatTestCase { return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) .put("transport.tcp.port", 9380 + nodeOrdinal) - .put("discovery.zen.ping.multicast.enabled", false) .put("discovery.zen.ping.unicast.hosts", "localhost:9380,localhost:9381,localhost:9390,localhost:9391") .build(); } @@ -43,7 +42,6 @@ public class UnicastBackwardsCompatibilityIT extends ESBackcompatTestCase { return Settings.settingsBuilder() .put(super.externalNodeSettings(nodeOrdinal)) .put("transport.tcp.port", 9390 + nodeOrdinal) - .put("discovery.zen.ping.multicast.enabled", false) .put("discovery.zen.ping.unicast.hosts", "localhost:9380,localhost:9381,localhost:9390,localhost:9391") .build(); } diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index fcf64e05187..95456fda901 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -123,7 +123,7 @@ abstract class FailAndRetryMockTransport imp } @Override - public TransportAddress[] addressesFromString(String address) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { throw new UnsupportedOperationException(); } diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index ae107b9496b..a1e10b088ad 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -32,6 +32,8 @@ import org.elasticsearch.transport.*; import org.junit.Test; import java.io.Closeable; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -53,6 +55,11 @@ public class TransportClientNodesServiceTests extends ESTestCase { TestIteration() { threadPool = new ThreadPool("transport-client-nodes-service-tests"); transport = new FailAndRetryMockTransport(getRandom()) { + @Override + public List getLocalAddresses() { + return Collections.EMPTY_LIST; + } + @Override protected TestResponse newResponse() { return new TestResponse(); diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java index 307cc0dbe16..42fa3fc204d 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java @@ -60,7 +60,7 @@ public class TransportClientRetryIT extends ESIntegTestCase { Settings.Builder builder = settingsBuilder().put("client.transport.nodes_sampler_interval", "1s") .put("name", "transport_client_retry_test") - .put("node.mode", InternalTestCluster.nodeMode()) + .put("node.mode", internalCluster().getNodeMode()) .put(ClusterName.SETTING, internalCluster().getClusterName()) .put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true) .put("path.home", createTempDir()); diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java index 71c2c2941a9..9776f667dc9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterServiceIT.java @@ -66,6 +66,7 @@ import static org.hamcrest.Matchers.notNullValue; * */ @ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ESIntegTestCase.SuppressLocalMode public class ClusterServiceIT extends ESIntegTestCase { @Test diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index 6939df28dcd..a00679d05be 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -44,6 +44,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC import static org.hamcrest.Matchers.*; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ESIntegTestCase.SuppressLocalMode public class MinimumMasterNodesIT extends ESIntegTestCase { @Test diff --git a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java index de0952e5c63..28ef5e195a2 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java @@ -57,6 +57,7 @@ import static org.hamcrest.Matchers.lessThan; /** */ @ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ESIntegTestCase.SuppressLocalMode public class NoMasterNodeIT extends ESIntegTestCase { @Test diff --git a/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java index 2876f7911e2..43b403d306c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java @@ -34,6 +34,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke import static org.hamcrest.Matchers.*; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ESIntegTestCase.SuppressLocalMode public class SpecificMasterNodesIT extends ESIntegTestCase { protected final Settings.Builder settingsBuilder() { diff --git a/core/src/test/java/org/elasticsearch/common/network/NetworkAddressTests.java b/core/src/test/java/org/elasticsearch/common/network/NetworkAddressTests.java new file mode 100644 index 00000000000..4ccc9f71660 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/common/network/NetworkAddressTests.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.network; + +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +/** + * Tests for network address formatting. Please avoid using any methods that cause DNS lookups! + */ +public class NetworkAddressTests extends ESTestCase { + + public void testFormatV4() throws Exception { + assertEquals("localhost/127.0.0.1", NetworkAddress.format(forge("localhost", "127.0.0.1"))); + assertEquals("127.0.0.1", NetworkAddress.format(forge(null, "127.0.0.1"))); + } + + public void testFormatV6() throws Exception { + assertEquals("localhost/::1", NetworkAddress.format(forge("localhost", "::1"))); + assertEquals("::1", NetworkAddress.format(forge(null, "::1"))); + } + + public void testFormatAddressV4() throws Exception { + assertEquals("127.0.0.1", NetworkAddress.formatAddress(forge("localhost", "127.0.0.1"))); + assertEquals("127.0.0.1", NetworkAddress.formatAddress(forge(null, "127.0.0.1"))); + } + + public void testFormatAddressV6() throws Exception { + assertEquals("::1", NetworkAddress.formatAddress(forge("localhost", "::1"))); + assertEquals("::1", NetworkAddress.formatAddress(forge(null, "::1"))); + } + + public void testFormatPortV4() throws Exception { + assertEquals("localhost/127.0.0.1:1234", NetworkAddress.format(new InetSocketAddress(forge("localhost", "127.0.0.1"), 1234))); + assertEquals("127.0.0.1:1234", NetworkAddress.format(new InetSocketAddress(forge(null, "127.0.0.1"), 1234))); + } + + public void testFormatPortV6() throws Exception { + assertEquals("localhost/[::1]:1234", NetworkAddress.format(new InetSocketAddress(forge("localhost", "::1"), 1234))); + assertEquals("[::1]:1234",NetworkAddress.format(new InetSocketAddress(forge(null, "::1"), 1234))); + } + + public void testFormatAddressPortV4() throws Exception { + assertEquals("127.0.0.1:1234", NetworkAddress.formatAddress(new InetSocketAddress(forge("localhost", "127.0.0.1"), 1234))); + assertEquals("127.0.0.1:1234", NetworkAddress.formatAddress(new InetSocketAddress(forge(null, "127.0.0.1"), 1234))); + } + + public void testFormatAddressPortV6() throws Exception { + assertEquals("[::1]:1234", NetworkAddress.formatAddress(new InetSocketAddress(forge("localhost", "::1"), 1234))); + assertEquals("[::1]:1234", NetworkAddress.formatAddress(new InetSocketAddress(forge(null, "::1"), 1234))); + } + + public void testNoScopeID() throws Exception { + assertEquals("::1", NetworkAddress.format(forgeScoped(null, "::1", 5))); + assertEquals("localhost/::1", NetworkAddress.format(forgeScoped("localhost", "::1", 5))); + + assertEquals("::1", NetworkAddress.formatAddress(forgeScoped(null, "::1", 5))); + assertEquals("::1", NetworkAddress.formatAddress(forgeScoped("localhost", "::1", 5))); + + assertEquals("[::1]:1234", NetworkAddress.format(new InetSocketAddress(forgeScoped(null, "::1", 5), 1234))); + assertEquals("localhost/[::1]:1234", NetworkAddress.format(new InetSocketAddress(forgeScoped("localhost", "::1", 5), 1234))); + + assertEquals("[::1]:1234", NetworkAddress.formatAddress(new InetSocketAddress(forgeScoped(null, "::1", 5), 1234))); + assertEquals("[::1]:1234", NetworkAddress.formatAddress(new InetSocketAddress(forgeScoped("localhost", "::1", 5), 1234))); + } + + /** creates address without any lookups. hostname can be null, for missing */ + private InetAddress forge(String hostname, String address) throws IOException { + if (hostname == null) { + return InetAddress.getByName(address); + } else { + byte bytes[] = InetAddress.getByName(address).getAddress(); + return InetAddress.getByAddress(hostname, bytes); + } + } + + /** creates scoped ipv6 address without any lookups. hostname can be null, for missing */ + private InetAddress forgeScoped(String hostname, String address, int scopeid) throws IOException { + byte bytes[] = InetAddress.getByName(address).getAddress(); + if (hostname == null) { + return Inet6Address.getByAddress(hostname, bytes); + } else { + return Inet6Address.getByAddress(hostname, bytes, scopeid); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 84b206dec68..c392e4a15e1 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -72,6 +72,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke import static org.hamcrest.Matchers.*; @ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +@ESIntegTestCase.SuppressLocalMode public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { private static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places. @@ -142,12 +143,15 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { .put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out .put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly .put("http.enabled", false) // just to make test quicker + .put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default + .put("transport.bind_host", "127.0.0.1") + .put("transport.publish_host", "127.0.0.1") .put("gateway.local.list_timeout", "10s") // still long to induce failures but to long so test won't time out .put("plugin.types", MockTransportService.TestPlugin.class.getName()) .build(); private void configureCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException { - if (randomBoolean()) { + if (randomBoolean() && canUseMuticast()) { configureMulticastCluster(numberOfNodes, minimumMasterNode); } else { configureUnicastCluster(numberOfNodes, null, minimumMasterNode); @@ -159,10 +163,13 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { if (minimumMasterNode < 0) { minimumMasterNode = numberOfNodes / 2 + 1; } + logger.info("---> configured multicast"); // TODO: Rarely use default settings form some of these Settings settings = Settings.builder() .put(DEFAULT_SETTINGS) + .put("discovery.zen.ping.multicast.enabled", true) .put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, minimumMasterNode) + .put() .build(); if (discoveryConfig == null) { @@ -174,6 +181,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { if (minimumMasterNode < 0) { minimumMasterNode = numberOfNodes / 2 + 1; } + logger.info("---> configured unicast"); // TODO: Rarely use default settings form some of these Settings nodeSettings = Settings.builder() .put(DEFAULT_SETTINGS) diff --git a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java index 891d538a679..87c1204b8b6 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryIT.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.equalTo; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ESIntegTestCase.SuppressLocalMode public class ZenUnicastDiscoveryIT extends ESIntegTestCase { private ClusterDiscoveryConfiguration discoveryConfig; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java index 943b0549a27..cc293375a2c 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.elect.ElectMasterService; @@ -41,15 +42,13 @@ import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.BytesTransportRequest; -import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; import org.hamcrest.Matchers; import org.junit.Test; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -66,6 +65,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +@ESIntegTestCase.SuppressLocalMode public class ZenDiscoveryIT extends ESIntegTestCase { @Test @@ -226,15 +226,14 @@ public class ZenDiscoveryIT extends ESIntegTestCase { } @Test - public void testHandleNodeJoin_incompatibleMinVersion() { + public void testHandleNodeJoin_incompatibleMinVersion() throws UnknownHostException { Settings nodeSettings = Settings.settingsBuilder() .put("discovery.type", "zen") // <-- To override the local setting if set externally - .put("node.mode", "local") // <-- force local transport so we can fake a network address .build(); String nodeName = internalCluster().startNode(nodeSettings, Version.V_2_0_0_beta1); ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName); - DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_1_6_0); + DiscoveryNode node = new DiscoveryNode("_node_id", new InetSocketTransportAddress(InetAddress.getByName("0.0.0.0"), 0), Version.V_1_6_0); final AtomicReference holder = new AtomicReference<>(); zenDiscovery.handleJoinRequest(node, new MembershipAction.JoinCallback() { @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingIT.java index 00da8b1d54b..c55f3404f00 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ping/multicast/MulticastZenPingIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; @@ -32,6 +33,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.zen.ping.PingContextProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -44,12 +46,14 @@ import java.net.MulticastSocket; import static org.hamcrest.Matchers.equalTo; +@ESIntegTestCase.Multicast public class MulticastZenPingIT extends ESTestCase { private Settings buildRandomMulticast(Settings settings) { Settings.Builder builder = Settings.builder().put(settings); builder.put("discovery.zen.ping.multicast.group", "224.2.3." + randomIntBetween(0, 255)); builder.put("discovery.zen.ping.multicast.port", randomIntBetween(55000, 56000)); + builder.put("discovery.zen.ping.multicast.enabled", true); if (randomBoolean()) { builder.put("discovery.zen.ping.multicast.shared", randomBoolean()); } @@ -129,7 +133,7 @@ public class MulticastZenPingIT extends ESTestCase { } } - @Test + @Test @SuppressForbidden(reason = "I bind to wildcard addresses. I am a total nightmare") public void testExternalPing() throws Exception { Settings settings = Settings.EMPTY; settings = buildRandomMulticast(settings); diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java index 83d31450088..509740ee575 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -39,6 +40,8 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty.NettyTransport; import org.junit.Test; +import java.net.InetSocketAddress; + import static org.hamcrest.Matchers.equalTo; public class UnicastZenPingIT extends ESTestCase { @@ -68,8 +71,8 @@ public class UnicastZenPingIT extends ESTestCase { InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress(); Settings hostsSettings = Settings.settingsBuilder().putArray("discovery.zen.ping.unicast.hosts", - addressA.address().getAddress().getHostAddress() + ":" + addressA.address().getPort(), - addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort()) + NetworkAddress.formatAddress(new InetSocketAddress(addressA.address().getAddress(), addressA.address().getPort())), + NetworkAddress.formatAddress(new InetSocketAddress(addressB.address().getAddress(), addressB.address().getPort()))) .build(); UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, electMasterService, null); diff --git a/core/src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java b/core/src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java index b6a8df5b696..f0368fba60b 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java +++ b/core/src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.http.netty.pipelining; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.test.ESTestCase; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap; @@ -32,6 +33,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -57,7 +59,7 @@ public class HttpPipeliningHandlerTest extends ESTestCase { private static final long CONNECTION_TIMEOUT = 10000L; private static final String CONTENT_TYPE_TEXT = "text/plain; charset=UTF-8"; // TODO make me random - private static final InetSocketAddress HOST_ADDR = new InetSocketAddress("127.0.0.1", 9080); + private static final InetSocketAddress HOST_ADDR = new InetSocketAddress(InetAddress.getLoopbackAddress(), 9080); private static final String PATH1 = "/1"; private static final String PATH2 = "/2"; private static final String SOME_RESPONSE_TEXT = "some response for "; @@ -123,13 +125,14 @@ public class HttpPipeliningHandlerTest extends ESTestCase { assertTrue(connectionFuture.await(CONNECTION_TIMEOUT)); final Channel clientChannel = connectionFuture.getChannel(); + // NetworkAddress.formatAddress makes a proper HOST header. final HttpRequest request1 = new DefaultHttpRequest( HTTP_1_1, HttpMethod.GET, PATH1); - request1.headers().add(HOST, HOST_ADDR.toString()); + request1.headers().add(HOST, NetworkAddress.formatAddress(HOST_ADDR)); final HttpRequest request2 = new DefaultHttpRequest( HTTP_1_1, HttpMethod.GET, PATH2); - request2.headers().add(HOST, HOST_ADDR.toString()); + request2.headers().add(HOST, NetworkAddress.formatAddress(HOST_ADDR)); clientChannel.write(request1); clientChannel.write(request2); diff --git a/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java b/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java index 77250476aa9..bdbcd45c7e4 100644 --- a/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java +++ b/core/src/test/java/org/elasticsearch/index/TransportIndexFailuresIT.java @@ -48,6 +48,7 @@ import static org.hamcrest.Matchers.equalTo; * Test failure when index replication actions fail mid-flight */ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +@ESIntegTestCase.SuppressLocalMode public class TransportIndexFailuresIT extends ESIntegTestCase { private static final Settings nodeSettings = Settings.settingsBuilder() diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index c75f2faa8b1..a1b25087570 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -64,6 +64,7 @@ import static org.hamcrest.Matchers.*; /** */ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0) +@ESIntegTestCase.SuppressLocalMode public class RareClusterStateIT extends ESIntegTestCase { @Override diff --git a/core/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleIT.java b/core/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleIT.java index 07904389d3c..bbeeac122f9 100644 --- a/core/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleIT.java +++ b/core/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.AssertingLocalTransport; import org.elasticsearch.threadpool.ThreadPool; @@ -49,6 +50,7 @@ public class PluggableTransportModuleIT extends ESIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { return settingsBuilder() .put(super.nodeSettings(nodeOrdinal)) + .put(DiscoveryModule.DISCOVERY_TYPE_KEY, "local") .put("plugin.types", CountingSentRequestsPlugin.class.getName()) .build(); } diff --git a/core/src/test/java/org/elasticsearch/plugins/PluginManagerIT.java b/core/src/test/java/org/elasticsearch/plugins/PluginManagerIT.java index e4d13b677b2..ff0786e7f9c 100644 --- a/core/src/test/java/org/elasticsearch/plugins/PluginManagerIT.java +++ b/core/src/test/java/org/elasticsearch/plugins/PluginManagerIT.java @@ -20,6 +20,7 @@ package org.elasticsearch.plugins; import com.google.common.base.Charsets; import com.google.common.hash.Hashing; + import org.apache.http.impl.client.HttpClients; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; @@ -51,8 +52,10 @@ import org.junit.Test; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; + import java.io.BufferedWriter; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; @@ -605,7 +608,7 @@ public class PluginManagerIT extends ESIntegTestCase { } }); - Channel channel = serverBootstrap.bind(new InetSocketAddress("localhost", 0)); + Channel channel = serverBootstrap.bind(new InetSocketAddress(InetAddress.getByName("localhost"), 0)); int port = ((InetSocketAddress) channel.getLocalAddress()).getPort(); // IO_ERROR because there is no real file delivered... assertStatus(String.format(Locale.ROOT, "install https://user:pass@localhost:%s/foo.zip --verbose --timeout 1s", port), ExitStatus.IO_ERROR); @@ -645,7 +648,6 @@ public class PluginManagerIT extends ESIntegTestCase { private Tuple buildInitialSettings() throws IOException { Settings settings = settingsBuilder() - .put("discovery.zen.ping.multicast.enabled", false) .put("http.enabled", true) .put("path.home", createTempDir()).build(); return InternalSettingsPreparer.prepareSettings(settings, false); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index cea3233b03d..75c72162ab9 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -63,8 +63,10 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.admin.cluster.repositories.get.RestGetRepositoriesAction; import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction; import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.transport.TransportModule; import org.junit.Test; import java.io.IOException; @@ -96,7 +98,8 @@ import static org.hamcrest.Matchers.nullValue; /** */ -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +@ESIntegTestCase.SuppressLocalMode // TODO only restorePersistentSettingsTest needs this maybe factor out? public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { @Test diff --git a/core/src/test/java/org/elasticsearch/stresstest/client/ClientFailover.java b/core/src/test/java/org/elasticsearch/stresstest/client/ClientFailover.java index fc56f128ede..946685190a6 100644 --- a/core/src/test/java/org/elasticsearch/stresstest/client/ClientFailover.java +++ b/core/src/test/java/org/elasticsearch/stresstest/client/ClientFailover.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; +import java.net.InetAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -42,9 +43,9 @@ public class ClientFailover { // TODO: what is this? a public static void main test?!?! final TransportClient client = TransportClient.builder().build() - .addTransportAddress(new InetSocketTransportAddress("localhost", 9300)) - .addTransportAddress(new InetSocketTransportAddress("localhost", 9301)) - .addTransportAddress(new InetSocketTransportAddress("localhost", 9302)); + .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300)) + .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9301)) + .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9302)); final AtomicBoolean done = new AtomicBoolean(); final AtomicLong indexed = new AtomicLong(); diff --git a/core/src/test/java/org/elasticsearch/stresstest/manyindices/ManyIndicesRemoteStressTest.java b/core/src/test/java/org/elasticsearch/stresstest/manyindices/ManyIndicesRemoteStressTest.java index 591f5ce3090..1917fd6a0b0 100644 --- a/core/src/test/java/org/elasticsearch/stresstest/manyindices/ManyIndicesRemoteStressTest.java +++ b/core/src/test/java/org/elasticsearch/stresstest/manyindices/ManyIndicesRemoteStressTest.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; +import java.net.InetAddress; import java.util.Date; /** @@ -49,7 +50,7 @@ public class ManyIndicesRemoteStressTest { Node node = null; // TODO: what is this? a public static void main test?!?!?! if (true) { - client = TransportClient.builder().settings(Settings.EMPTY).build().addTransportAddress(new InetSocketTransportAddress("localhost", 9300)); + client = TransportClient.builder().settings(Settings.EMPTY).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300)); } else { node = NodeBuilder.nodeBuilder().client(true).node(); client = node.client(); diff --git a/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java b/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java index 80a60ffbd20..f29972f9c24 100644 --- a/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/core/src/test/java/org/elasticsearch/test/ESIntegTestCase.java @@ -32,6 +32,7 @@ import org.apache.http.impl.client.HttpClients; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.index.shard.MergeSchedulerConfig; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase; @@ -117,6 +118,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.test.client.RandomizingClient; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.rest.client.http.HttpRequestBuilder; +import org.elasticsearch.transport.TransportModule; import org.hamcrest.Matchers; import org.joda.time.DateTimeZone; import org.junit.*; @@ -124,7 +126,9 @@ import org.junit.*; import java.io.IOException; import java.io.InputStream; import java.lang.annotation.*; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; @@ -1553,49 +1557,51 @@ public abstract class ESIntegTestCase extends ESTestCase { assertThat(clearResponse.isSucceeded(), equalTo(true)); } - private static ClusterScope getAnnotation(Class clazz) { + private static A getAnnotation(Class clazz, Class annotationClass) { if (clazz == Object.class || clazz == ESIntegTestCase.class) { return null; } - ClusterScope annotation = clazz.getAnnotation(ClusterScope.class); + A annotation = clazz.getAnnotation(annotationClass); if (annotation != null) { return annotation; } - return getAnnotation(clazz.getSuperclass()); + return getAnnotation(clazz.getSuperclass(), annotationClass); } + + private Scope getCurrentClusterScope() { return getCurrentClusterScope(this.getClass()); } private static Scope getCurrentClusterScope(Class clazz) { - ClusterScope annotation = getAnnotation(clazz); + ClusterScope annotation = getAnnotation(clazz, ClusterScope.class); // if we are not annotated assume suite! return annotation == null ? Scope.SUITE : annotation.scope(); } private int getNumDataNodes() { - ClusterScope annotation = getAnnotation(this.getClass()); + ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class); return annotation == null ? -1 : annotation.numDataNodes(); } private int getMinNumDataNodes() { - ClusterScope annotation = getAnnotation(this.getClass()); + ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class); return annotation == null || annotation.minNumDataNodes() == -1 ? InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES : annotation.minNumDataNodes(); } private int getMaxNumDataNodes() { - ClusterScope annotation = getAnnotation(this.getClass()); + ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class); return annotation == null || annotation.maxNumDataNodes() == -1 ? InternalTestCluster.DEFAULT_MAX_NUM_DATA_NODES : annotation.maxNumDataNodes(); } private int getNumClientNodes() { - ClusterScope annotation = getAnnotation(this.getClass()); + ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class); return annotation == null ? InternalTestCluster.DEFAULT_NUM_CLIENT_NODES : annotation.numClientNodes(); } private boolean randomDynamicTemplates() { - ClusterScope annotation = getAnnotation(this.getClass()); + ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class); return annotation == null || annotation.randomDynamicTemplates(); } @@ -1607,7 +1613,7 @@ public abstract class ESIntegTestCase extends ESTestCase { * In other words subclasses must ensure this method is idempotent. */ protected Settings nodeSettings(int nodeOrdinal) { - return settingsBuilder() + Settings.Builder builder = settingsBuilder() // Default the watermarks to absurdly low to prevent the tests // from failing on nodes without enough disk space .put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "1b") @@ -1615,8 +1621,8 @@ public abstract class ESIntegTestCase extends ESTestCase { .put("script.indexed", "on") .put("script.inline", "on") // wait short time for other active shards before actually deleting, default 30s not needed in tests - .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(1, TimeUnit.SECONDS)) - .build(); + .put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(1, TimeUnit.SECONDS)); + return builder.build(); } /** @@ -1629,7 +1635,7 @@ public abstract class ESIntegTestCase extends ESTestCase { return Settings.EMPTY; } - private ExternalTestCluster buildExternalCluster(String clusterAddresses) { + private ExternalTestCluster buildExternalCluster(String clusterAddresses) throws UnknownHostException { String[] stringAddresses = clusterAddresses.split(","); TransportAddress[] transportAddresses = new TransportAddress[stringAddresses.length]; int i = 0; @@ -1639,7 +1645,7 @@ public abstract class ESIntegTestCase extends ESTestCase { throw new IllegalArgumentException("address [" + clusterAddresses + "] not valid"); } try { - transportAddresses[i++] = new InetSocketTransportAddress(split[0], Integer.valueOf(split[1])); + transportAddresses[i++] = new InetSocketTransportAddress(InetAddress.getByName(split[0]), Integer.valueOf(split[1])); } catch (NumberFormatException e) { throw new IllegalArgumentException("port is not valid, expected number but was [" + split[1] + "]"); } @@ -1693,7 +1699,18 @@ public abstract class ESIntegTestCase extends ESTestCase { minNumDataNodes = getMinNumDataNodes(); maxNumDataNodes = getMaxNumDataNodes(); } - return new InternalTestCluster(seed, createTempDir(), minNumDataNodes, maxNumDataNodes, + SuppressLocalMode noLocal = getAnnotation(this.getClass(), SuppressLocalMode.class); + SuppressNetworkMode noNetwork = getAnnotation(this.getClass(), SuppressNetworkMode.class); + String nodeMode = InternalTestCluster.configuredNodeMode(); + if (noLocal != null && noNetwork != null) { + throw new IllegalStateException("Can't suppress both network and local mode"); + } else if (noLocal != null){ + nodeMode = "network"; + } else if (noNetwork != null) { + nodeMode = "local"; + } + + return new InternalTestCluster(nodeMode, seed, createTempDir(), minNumDataNodes, maxNumDataNodes, InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", settingsSource, getNumClientNodes(), InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix); } @@ -1715,7 +1732,7 @@ public abstract class ESIntegTestCase extends ESTestCase { * return a random ratio in the interval [0..1] */ protected double getPerTestTransportClientRatio() { - final ClusterScope annotation = getAnnotation(this.getClass()); + final ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class); double perTestRatio = -1; if (annotation != null) { perTestRatio = annotation.transportClientRatio(); @@ -1947,7 +1964,7 @@ public abstract class ESIntegTestCase extends ESTestCase { TransportAddress publishAddress = randomFrom(nodes).getHttp().address().publishAddress(); assertEquals(1, publishAddress.uniqueAddressTypeId()); InetSocketAddress address = ((InetSocketTransportAddress) publishAddress).address(); - return new HttpRequestBuilder(HttpClients.createDefault()).host(address.getHostName()).port(address.getPort()); + return new HttpRequestBuilder(HttpClients.createDefault()).host(NetworkAddress.formatAddress(address.getAddress())).port(address.getPort()); } /** @@ -1973,4 +1990,39 @@ public abstract class ESIntegTestCase extends ESTestCase { @Inherited public @interface SuiteScopeTestCase { } + + /** + * If used the test will never run in local mode. + */ + @Retention(RetentionPolicy.RUNTIME) + @Inherited + public @interface SuppressLocalMode {} + + /** + * If used the test will never run in network mode + */ + @Retention(RetentionPolicy.RUNTIME) + @Inherited + public @interface SuppressNetworkMode {} + + /** + * Annotation used to set if working multicast is required to run the test. + * By default, tests annotated with @Multicast won't be executed. + * Set -Dtests.multicast=true when running test to launch multicast tests + */ + @Retention(RetentionPolicy.RUNTIME) + @Inherited + @TestGroup(enabled = false, sysProperty = "tests.multicast") + public @interface Multicast { + } + + + /** + * Returns true if tests can use multicast. Default is false. + * To disable an entire test use {@link org.elasticsearch.test.ESIntegTestCase.Multicast} instead + */ + protected boolean canUseMuticast() { + return Boolean.parseBoolean(System.getProperty("tests.multicast", "false")); + } + } diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 31ddd9699a9..783021a0429 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -192,8 +192,6 @@ public final class InternalTestCluster extends TestCluster { static final boolean DEFAULT_ENABLE_HTTP_PIPELINING = true; - public static final String NODE_MODE = nodeMode(); - /* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */ private final NavigableMap nodes = new TreeMap<>(); @@ -227,16 +225,21 @@ public final class InternalTestCluster extends TestCluster { private final Path baseDir; private ServiceDisruptionScheme activeDisruptionScheme; + private String nodeMode; - public InternalTestCluster(long clusterSeed, Path baseDir, int minNumDataNodes, int maxNumDataNodes, String clusterName, int numClientNodes, + public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir, int minNumDataNodes, int maxNumDataNodes, String clusterName, int numClientNodes, boolean enableHttpPipelining, String nodePrefix) { - this(clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableHttpPipelining, nodePrefix); + this(nodeMode, clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableHttpPipelining, nodePrefix); } - public InternalTestCluster(long clusterSeed, Path baseDir, + public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir, int minNumDataNodes, int maxNumDataNodes, String clusterName, SettingsSource settingsSource, int numClientNodes, boolean enableHttpPipelining, String nodePrefix) { super(clusterSeed); + if ("network".equals(nodeMode) == false && "local".equals(nodeMode) == false) { + throw new IllegalArgumentException("Unknown nodeMode: " + nodeMode); + } + this.nodeMode = nodeMode; this.baseDir = baseDir; this.clusterName = clusterName; if (minNumDataNodes < 0 || maxNumDataNodes < 0) { @@ -300,7 +303,7 @@ public final class InternalTestCluster extends TestCluster { builder.put("transport.tcp.port", BASE_PORT + "-" + (BASE_PORT + 100)); builder.put("http.port", BASE_PORT + 101 + "-" + (BASE_PORT + 200)); builder.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true); - builder.put("node.mode", NODE_MODE); + builder.put("node.mode", nodeMode); builder.put("http.pipelining", enableHttpPipelining); if (Strings.hasLength(System.getProperty("es.logger.level"))) { builder.put("logger.level", System.getProperty("es.logger.level")); @@ -327,7 +330,7 @@ public final class InternalTestCluster extends TestCluster { executor = EsExecutors.newCached("test runner", 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName)); } - public static String nodeMode() { + public static String configuredNodeMode() { Builder builder = Settings.builder(); if (Strings.isEmpty(System.getProperty("es.node.mode")) && Strings.isEmpty(System.getProperty("es.node.local"))) { return "local"; // default if nothing is specified @@ -354,11 +357,8 @@ public final class InternalTestCluster extends TestCluster { return nodes.keySet().toArray(Strings.EMPTY_ARRAY); } - private static boolean isLocalTransportConfigured() { - if ("local".equals(System.getProperty("es.node.mode", "network"))) { - return true; - } - return Boolean.parseBoolean(System.getProperty("es.node.local", "false")); + private boolean isLocalTransportConfigured() { + return "local".equals(nodeMode); } private Settings getSettings(int nodeOrdinal, long nodeSeed, Settings others) { @@ -378,7 +378,7 @@ public final class InternalTestCluster extends TestCluster { return builder.build(); } - private static Settings getRandomNodeSettings(long seed) { + private Settings getRandomNodeSettings(long seed) { Random random = new Random(seed); Builder builder = Settings.settingsBuilder() .put(SETTING_CLUSTER_NODE_SEED, seed); @@ -782,6 +782,10 @@ public final class InternalTestCluster extends TestCluster { } } + public String getNodeMode() { + return nodeMode; + } + private final class NodeAndClient implements Closeable { private Node node; private Client nodeClient; @@ -844,7 +848,7 @@ public final class InternalTestCluster extends TestCluster { /* no sniff client for now - doesn't work will all tests since it might throw NoNodeAvailableException if nodes are shut down. * we first need support of transportClientRatio as annotations or so */ - return transportClient = new TransportClientFactory(false, settingsSource.transportClient(), baseDir).client(node, clusterName); + return transportClient = new TransportClientFactory(false, settingsSource.transportClient(), baseDir, nodeMode).client(node, clusterName); } void resetClient() throws IOException { @@ -901,11 +905,13 @@ public final class InternalTestCluster extends TestCluster { private final boolean sniff; private final Settings settings; private final Path baseDir; + private final String nodeMode; - TransportClientFactory(boolean sniff, Settings settings, Path baseDir) { + TransportClientFactory(boolean sniff, Settings settings, Path baseDir, String nodeMode) { this.sniff = sniff; this.settings = settings != null ? settings : Settings.EMPTY; this.baseDir = baseDir; + this.nodeMode = nodeMode; } public Client client(Node node, String clusterName) { @@ -916,7 +922,7 @@ public final class InternalTestCluster extends TestCluster { .put("path.home", baseDir) .put("name", TRANSPORT_CLIENT_PREFIX + node.settings().get("name")) .put(ClusterName.SETTING, clusterName).put("client.transport.sniff", sniff) - .put("node.mode", nodeSettings.get("node.mode", NODE_MODE)) + .put("node.mode", nodeSettings.get("node.mode", nodeMode)) .put("node.local", nodeSettings.get("node.local", "")) .put("logger.prefix", nodeSettings.get("logger.prefix", "")) .put("logger.level", nodeSettings.get("logger.level", "INFO")) diff --git a/core/src/test/java/org/elasticsearch/test/TestCluster.java b/core/src/test/java/org/elasticsearch/test/TestCluster.java index 593ceee9bb4..4f95fc99447 100644 --- a/core/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/TestCluster.java @@ -211,4 +211,6 @@ public abstract class TestCluster implements Iterable, Closeable { * Returns the cluster name */ public abstract String getClusterName(); + + } diff --git a/core/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java b/core/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java index 79f51594daf..b7d0c2dd930 100644 --- a/core/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java +++ b/core/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java @@ -21,6 +21,7 @@ package org.elasticsearch.test.discovery; import com.carrotsearch.randomizedtesting.RandomizedTest; import com.google.common.primitives.Ints; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.InternalTestCluster; @@ -28,15 +29,17 @@ import org.elasticsearch.test.SettingsSource; import org.elasticsearch.transport.local.LocalTransport; import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.util.ArrayList; import java.util.HashSet; import java.util.Set; public class ClusterDiscoveryConfiguration extends SettingsSource { static Settings DEFAULT_NODE_SETTINGS = Settings.settingsBuilder().put("discovery.type", "zen").build(); + private static final String IP_ADDR = "127.0.0.1"; final int numOfNodes; final Settings nodeSettings; @@ -63,29 +66,15 @@ public class ClusterDiscoveryConfiguration extends SettingsSource { // this variable is incremented on each bind attempt and will maintain the next port that should be tried private static int nextPort = calcBasePort(); - // since we run multiple test iterations, we need some flexibility in the choice of ports a node can have (port may - // stay in use by previous iterations on some OSes - read CentOs). This controls the amount of ports each node - // is assigned. All ports in range will be added to the unicast hosts, which is OK because we know only one will be used. - private static final int NUM_PORTS_PER_NODE = 3; - - private final String[] unicastHosts; - private final boolean localMode; - - public UnicastZen(int numOfNodes) { - this(numOfNodes, numOfNodes); - } + private final int[] unicastHostOrdinals; + private final int[] unicastHostPorts; public UnicastZen(int numOfNodes, Settings extraSettings) { this(numOfNodes, numOfNodes, extraSettings); } - public UnicastZen(int numOfNodes, int numOfUnicastHosts) { - this(numOfNodes, numOfUnicastHosts, Settings.EMPTY); - } - public UnicastZen(int numOfNodes, int numOfUnicastHosts, Settings extraSettings) { super(numOfNodes, extraSettings); - int[] unicastHostOrdinals; if (numOfUnicastHosts == numOfNodes) { unicastHostOrdinals = new int[numOfNodes]; for (int i = 0; i < numOfNodes; i++) { @@ -98,8 +87,8 @@ public class ClusterDiscoveryConfiguration extends SettingsSource { } unicastHostOrdinals = Ints.toArray(ordinals); } - this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local"); - this.unicastHosts = buildUnicastHostSetting(unicastHostOrdinals, localMode); + this.unicastHostPorts = unicastHostPorts(numOfNodes); + assert unicastHostOrdinals.length <= unicastHostPorts.length; } public UnicastZen(int numOfNodes, int[] unicastHostOrdinals) { @@ -108,59 +97,70 @@ public class ClusterDiscoveryConfiguration extends SettingsSource { public UnicastZen(int numOfNodes, Settings extraSettings, int[] unicastHostOrdinals) { super(numOfNodes, extraSettings); - this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local"); - this.unicastHosts = buildUnicastHostSetting(unicastHostOrdinals, localMode); + this.unicastHostOrdinals = unicastHostOrdinals; + this.unicastHostPorts = unicastHostPorts(numOfNodes); + assert unicastHostOrdinals.length <= unicastHostPorts.length; } private static int calcBasePort() { return 30000 + InternalTestCluster.BASE_PORT; } - private static String[] buildUnicastHostSetting(int[] unicastHostOrdinals, boolean localMode) { - ArrayList unicastHosts = new ArrayList<>(); - for (int i = 0; i < unicastHostOrdinals.length; i++) { - final int hostOrdinal = unicastHostOrdinals[i]; - if (localMode) { - unicastHosts.add("node_" + hostOrdinal); - } else { - // we need to pin the node port & host so we'd know where to point things - final int[] ports = nodePorts(hostOrdinal); - for (int port : ports) { - unicastHosts.add("localhost:" + port); - } - } - } - return unicastHosts.toArray(new String[unicastHosts.size()]); - } - @Override public Settings node(int nodeOrdinal) { - Settings.Builder builder = Settings.builder() - .put("discovery.zen.ping.multicast.enabled", false); + Settings.Builder builder = Settings.builder(); - if (localMode) { - builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal); + String[] unicastHosts = new String[unicastHostOrdinals.length]; + if (nodeOrdinal >= unicastHostPorts.length) { + throw new ElasticsearchException("nodeOrdinal [" + nodeOrdinal + "] is greater than the number unicast ports [" + unicastHostPorts.length + "]"); } else { // we need to pin the node port & host so we'd know where to point things - String ports = ""; - for (int port : nodePorts(nodeOrdinal)) { - ports += "," + port; + builder.put("transport.tcp.port", unicastHostPorts[nodeOrdinal]); + builder.put("transport.host", IP_ADDR); // only bind on one IF we use v4 here by default + builder.put("transport.bind_host", IP_ADDR); + builder.put("transport.publish_host", IP_ADDR); + builder.put("http.enabled", false); + for (int i = 0; i < unicastHostOrdinals.length; i++) { + unicastHosts[i] = IP_ADDR + ":" + (unicastHostPorts[unicastHostOrdinals[i]]); } - builder.put("transport.tcp.port", ports.substring(1)); - builder.put("transport.host", "localhost"); } builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts); return builder.put(super.node(nodeOrdinal)).build(); } - protected static int[] nodePorts(int nodeOridnal) { - int[] unicastHostPorts = new int[NUM_PORTS_PER_NODE]; + @SuppressForbidden(reason = "we know we pass a IP address") + protected synchronized static int[] unicastHostPorts(int numHosts) { + int[] unicastHostPorts = new int[numHosts]; - final int basePort = calcBasePort() + nodeOridnal * NUM_PORTS_PER_NODE; + final int basePort = calcBasePort(); + final int maxPort = basePort + InternalTestCluster.PORTS_PER_JVM; + int tries = 0; for (int i = 0; i < unicastHostPorts.length; i++) { - unicastHostPorts[i] = basePort + i; - } + boolean foundPortInRange = false; + while (tries < InternalTestCluster.PORTS_PER_JVM && !foundPortInRange) { + try (ServerSocket serverSocket = new ServerSocket()) { + // Set SO_REUSEADDR as we may bind here and not be able to reuse the address immediately without it. + serverSocket.setReuseAddress(NetworkUtils.defaultReuseAddress()); + serverSocket.bind(new InetSocketAddress(IP_ADDR, nextPort)); + // bind was a success + foundPortInRange = true; + unicastHostPorts[i] = nextPort; + } catch (IOException e) { + // Do nothing + } + nextPort++; + if (nextPort >= maxPort) { + // Roll back to the beginning of the range and do not go into another JVM's port range + nextPort = basePort; + } + tries++; + } + + if (!foundPortInRange) { + throw new ElasticsearchException("could not find enough open ports in range [" + basePort + "-" + maxPort + "]. required [" + unicastHostPorts.length + "] ports"); + } + } return unicastHostPorts; } } diff --git a/core/src/test/java/org/elasticsearch/test/rest/client/RestClient.java b/core/src/test/java/org/elasticsearch/test/rest/client/RestClient.java index 97854ff9510..f18f9201d1a 100644 --- a/core/src/test/java/org/elasticsearch/test/rest/client/RestClient.java +++ b/core/src/test/java/org/elasticsearch/test/rest/client/RestClient.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.rest.client.http.HttpRequestBuilder; import org.elasticsearch.test.rest.client.http.HttpResponse; @@ -247,7 +248,7 @@ public class RestClient implements Closeable { return new HttpRequestBuilder(httpClient) .addHeaders(headers) .protocol(protocol) - .host(address.getHostName()).port(address.getPort()); + .host(NetworkAddress.formatAddress(address.getAddress())).port(address.getPort()); } protected HttpRequestBuilder httpRequestBuilder() { diff --git a/core/src/test/java/org/elasticsearch/test/rest/client/http/HttpRequestBuilder.java b/core/src/test/java/org/elasticsearch/test/rest/client/http/HttpRequestBuilder.java index 09f79a0fc7c..7791cda7841 100644 --- a/core/src/test/java/org/elasticsearch/test/rest/client/http/HttpRequestBuilder.java +++ b/core/src/test/java/org/elasticsearch/test/rest/client/http/HttpRequestBuilder.java @@ -27,6 +27,7 @@ import org.elasticsearch.client.support.Headers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.http.HttpServerTransport; @@ -77,7 +78,7 @@ public class HttpRequestBuilder { public HttpRequestBuilder httpTransport(HttpServerTransport httpServerTransport) { InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().publishAddress(); - return host(transportAddress.address().getHostName()).port(transportAddress.address().getPort()); + return host(NetworkAddress.formatAddress(transportAddress.address().getAddress())).port(transportAddress.address().getPort()); } public HttpRequestBuilder port(int port) { diff --git a/core/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/core/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java index 97854f408c4..16e3c702580 100644 --- a/core/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java +++ b/core/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java @@ -55,8 +55,8 @@ public class InternalTestClusterTests extends ESTestCase { String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); Path baseDir = createTempDir(); - InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix); - InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix); + InternalTestCluster cluster0 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix); + InternalTestCluster cluster1 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix); assertClusters(cluster0, cluster1, true); } @@ -99,8 +99,8 @@ public class InternalTestClusterTests extends ESTestCase { String nodePrefix = "foobar"; Path baseDir = createTempDir(); - InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName1, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix); - InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName2, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix); + InternalTestCluster cluster0 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName1, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix); + InternalTestCluster cluster1 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName2, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix); assertClusters(cluster0, cluster1, false); long seed = randomLong(); diff --git a/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java b/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java index 8cb1f620c3a..476b89aa1a9 100644 --- a/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/core/src/test/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -28,10 +28,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.transport.*; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.BlockingQueue; /** A transport class that doesn't send anything but rather captures all requests for inspection from tests */ @@ -114,7 +111,8 @@ public class CapturingTransport implements Transport { } @Override - public TransportAddress[] addressesFromString(String address) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + // WTF return new TransportAddress[0]; } @@ -177,4 +175,9 @@ public class CapturingTransport implements Transport { public void close() { } + + @Override + public List getLocalAddresses() { + return Collections.EMPTY_LIST; + } } diff --git a/core/src/test/java/org/elasticsearch/test/transport/MockTransportService.java b/core/src/test/java/org/elasticsearch/test/transport/MockTransportService.java index 0aa2e3487fa..f5da216da5d 100644 --- a/core/src/test/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/core/src/test/java/org/elasticsearch/test/transport/MockTransportService.java @@ -362,8 +362,8 @@ public class MockTransportService extends TransportService { } @Override - public TransportAddress[] addressesFromString(String address) throws Exception { - return transport.addressesFromString(address); + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + return transport.addressesFromString(address, perAddressLimit); } @Override @@ -401,6 +401,11 @@ public class MockTransportService extends TransportService { return transport.serverOpen(); } + @Override + public List getLocalAddresses() { + return transport.getLocalAddresses(); + } + @Override public Lifecycle.State lifecycleState() { return transport.lifecycleState(); diff --git a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java index 3ffc945bf2f..5f80aa357b6 100644 --- a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java +++ b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.transport; import com.google.common.base.Charsets; + import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; @@ -39,6 +40,7 @@ import org.junit.Test; import java.io.BufferedReader; import java.io.InputStreamReader; +import java.net.InetAddress; import java.net.Socket; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -55,7 +57,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase { private ThreadPool threadPool; private NettyTransport nettyTransport; private int port; - private String host; + private InetAddress host; @Before public void startThreadPool() { @@ -70,7 +72,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase { InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) nettyTransport.boundAddress().boundAddress(); port = transportAddress.address().getPort(); - host = transportAddress.address().getHostString(); + host = transportAddress.address().getAddress(); } diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java index 78ba16ac750..e945f165e76 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.test.ESIntegTestCase; @@ -32,6 +33,7 @@ import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportModule; import org.junit.Test; +import java.net.InetAddress; import java.util.Locale; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -71,7 +73,7 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase { .put("path.home", createTempDir().toString()) .build(); try (TransportClient transportClient = TransportClient.builder().settings(settings).loadConfigSettings(false).build()) { - transportClient.addTransportAddress(new InetSocketTransportAddress("127.0.0.1", randomPort)); + transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), randomPort)); ClusterHealthResponse response = transportClient.admin().cluster().prepareHealth().get(); assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN)); } @@ -93,7 +95,7 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase { // publish address assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(), instanceOf(InetSocketTransportAddress.class)); InetSocketTransportAddress publishAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(); - assertThat(publishAddress.address().getHostName(), is("127.0.0.7")); + assertThat(NetworkAddress.formatAddress(publishAddress.address().getAddress()), is("127.0.0.7")); assertThat(publishAddress.address().getPort(), is(4321)); } } diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java index 1a494de4931..11e5feed23e 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortTests.java @@ -170,7 +170,7 @@ public class NettyTransportMultiPortTests extends ESTestCase { // Set SO_REUSEADDR as we may bind here and not be able // to reuse the address immediately without it. serverSocket.setReuseAddress(NetworkUtils.defaultReuseAddress()); - serverSocket.bind(new InetSocketAddress(nextPort)); + serverSocket.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), nextPort)); // bind was a success logger.debug("port [{}] available.", nextPort); @@ -199,7 +199,7 @@ public class NettyTransportMultiPortTests extends ESTestCase { private void assertConnectionRefused(int port) throws Exception { try { - trySocketConnection(new InetSocketTransportAddress("localhost", port).address()); + trySocketConnection(new InetSocketTransportAddress(InetAddress.getByName("localhost"), port).address()); fail("Expected to get exception when connecting to port " + port); } catch (IOException e) { // expected @@ -213,7 +213,7 @@ public class NettyTransportMultiPortTests extends ESTestCase { private void assertPortIsBound(String host, int port) throws Exception { logger.info("Trying to connect to [{}]:[{}]", host, port); - trySocketConnection(new InetSocketTransportAddress(host, port).address()); + trySocketConnection(new InetSocketTransportAddress(InetAddress.getByName(host), port).address()); } private void trySocketConnection(InetSocketAddress address) throws Exception { diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java new file mode 100644 index 00000000000..a5bd6612cdf --- /dev/null +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportTests.java @@ -0,0 +1,130 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.netty; + +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESTestCase; + +/** Unit tests for NettyTransport */ +public class NettyTransportTests extends ESTestCase { + + /** Test ipv4 host with a default port works */ + public void testParseV4DefaultPort() throws Exception { + TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE); + assertEquals(1, addresses.length); + + assertEquals("127.0.0.1", addresses[0].getAddress()); + assertEquals(1234, addresses[0].getPort()); + } + + /** Test ipv4 host with a default port range works */ + public void testParseV4DefaultRange() throws Exception { + TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE); + assertEquals(2, addresses.length); + + assertEquals("127.0.0.1", addresses[0].getAddress()); + assertEquals(1234, addresses[0].getPort()); + + assertEquals("127.0.0.1", addresses[1].getAddress()); + assertEquals(1235, addresses[1].getPort()); + } + + /** Test ipv4 host with port works */ + public void testParseV4WithPort() throws Exception { + TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE); + assertEquals(1, addresses.length); + + assertEquals("127.0.0.1", addresses[0].getAddress()); + assertEquals(2345, addresses[0].getPort()); + } + + /** Test ipv4 host with port range works */ + public void testParseV4WithPortRange() throws Exception { + TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE); + assertEquals(2, addresses.length); + + assertEquals("127.0.0.1", addresses[0].getAddress()); + assertEquals(2345, addresses[0].getPort()); + + assertEquals("127.0.0.1", addresses[1].getAddress()); + assertEquals(2346, addresses[1].getPort()); + } + + /** Test unbracketed ipv6 hosts in configuration fail. Leave no ambiguity */ + public void testParseV6UnBracketed() throws Exception { + try { + NettyTransport.parse("::1", "1234", Integer.MAX_VALUE); + fail("should have gotten exception"); + } catch (IllegalArgumentException expected) { + assertTrue(expected.getMessage().contains("must be bracketed")); + } + } + + /** Test ipv6 host with a default port works */ + public void testParseV6DefaultPort() throws Exception { + TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234", Integer.MAX_VALUE); + assertEquals(1, addresses.length); + + assertEquals("::1", addresses[0].getAddress()); + assertEquals(1234, addresses[0].getPort()); + } + + /** Test ipv6 host with a default port range works */ + public void testParseV6DefaultRange() throws Exception { + TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE); + assertEquals(2, addresses.length); + + assertEquals("::1", addresses[0].getAddress()); + assertEquals(1234, addresses[0].getPort()); + + assertEquals("::1", addresses[1].getAddress()); + assertEquals(1235, addresses[1].getPort()); + } + + /** Test ipv6 host with port works */ + public void testParseV6WithPort() throws Exception { + TransportAddress[] addresses = NettyTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE); + assertEquals(1, addresses.length); + + assertEquals("::1", addresses[0].getAddress()); + assertEquals(2345, addresses[0].getPort()); + } + + /** Test ipv6 host with port range works */ + public void testParseV6WithPortRange() throws Exception { + TransportAddress[] addresses = NettyTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE); + assertEquals(2, addresses.length); + + assertEquals("::1", addresses[0].getAddress()); + assertEquals(2345, addresses[0].getPort()); + + assertEquals("::1", addresses[1].getAddress()); + assertEquals(2346, addresses[1].getPort()); + } + + /** Test per-address limit */ + public void testAddressLimit() throws Exception { + TransportAddress[] addresses = NettyTransport.parse("[::1]:100-200", "1000", 3); + assertEquals(3, addresses.length); + assertEquals(100, addresses[0].getPort()); + assertEquals(101, addresses[1].getPort()); + assertEquals(102, addresses[2].getPort()); + } +} diff --git a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index a0b6ddbdbb5..5b8557178f2 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -31,6 +31,9 @@ import org.elasticsearch.transport.AbstractSimpleTransportTests; import org.elasticsearch.transport.ConnectTransportException; import org.junit.Test; +import java.net.InetAddress; +import java.net.UnknownHostException; + public class SimpleNettyTransportTests extends AbstractSimpleTransportTests { @Override @@ -44,7 +47,7 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTests { } @Test(expected = ConnectTransportException.class) - public void testConnectException() { - serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress("localhost", 9876), Version.CURRENT)); + public void testConnectException() throws UnknownHostException { + serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9876), Version.CURRENT)); } } \ No newline at end of file diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 5b8d8f32d97..0f5e8bfc9a1 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -70,7 +70,7 @@ public class TribeIT extends ESIntegTestCase { public static void setupSecondCluster() throws Exception { ESIntegTestCase.beforeClass(); // create another cluster - cluster2 = new InternalTestCluster(randomLong(), createTempDir(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, SECOND_CLUSTER_NODE_PREFIX); + cluster2 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), randomLong(), createTempDir(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, SECOND_CLUSTER_NODE_PREFIX); cluster2.beforeTest(getRandom(), 0.1); cluster2.ensureAtLeastNumDataNodes(2); } diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java b/core/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java index 7bcbd996a54..10b45017b26 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java @@ -48,7 +48,7 @@ public class TribeUnitTests extends ESTestCase { private static Node tribe1; private static Node tribe2; - private static final String NODE_MODE = InternalTestCluster.nodeMode(); + private static final String NODE_MODE = InternalTestCluster.configuredNodeMode(); @BeforeClass public static void createTribes() { diff --git a/dev-tools/src/main/resources/ant/integration-tests.xml b/dev-tools/src/main/resources/ant/integration-tests.xml index 3740504d42c..09731421f1f 100644 --- a/dev-tools/src/main/resources/ant/integration-tests.xml +++ b/dev-tools/src/main/resources/ant/integration-tests.xml @@ -136,8 +136,7 @@ - - + @@ -166,7 +165,6 @@ - @@ -243,7 +241,7 @@ - diff --git a/dev-tools/src/main/resources/forbidden/all-signatures.txt b/dev-tools/src/main/resources/forbidden/all-signatures.txt index e61d58d4328..00d4871336a 100644 --- a/dev-tools/src/main/resources/forbidden/all-signatures.txt +++ b/dev-tools/src/main/resources/forbidden/all-signatures.txt @@ -59,3 +59,30 @@ java.nio.file.Files#isHidden(java.nio.file.Path) @ Dependent on the operating sy java.nio.file.Files#getFileStore(java.nio.file.Path) @ Use Environment.getFileStore() instead, impacted by JDK-8034057 java.nio.file.Files#isWritable(java.nio.file.Path) @ Use Environment.isWritable() instead, impacted by JDK-8034057 + +@defaultMessage Resolve hosts explicitly to the address(es) you want with InetAddress. +java.net.InetSocketAddress#(java.lang.String,int) +java.net.Socket#(java.lang.String,int) +java.net.Socket#(java.lang.String,int,java.net.InetAddress,int) + +@defaultMessage Don't bind to wildcard addresses. Be specific. +java.net.DatagramSocket#() +java.net.DatagramSocket#(int) +java.net.InetSocketAddress#(int) +java.net.MulticastSocket#() +java.net.MulticastSocket#(int) +java.net.ServerSocket#(int) +java.net.ServerSocket#(int,int) + +@defaultMessage use NetworkAddress format/formatAddress to print IP or IP+ports +java.net.InetAddress#toString() +java.net.InetAddress#getHostAddress() +java.net.Inet4Address#getHostAddress() +java.net.Inet6Address#getHostAddress() +java.net.InetSocketAddress#toString() + +@defaultMessage avoid DNS lookups by accident: if you have a valid reason, then @SuppressWarnings with that reason so its completely clear +java.net.InetAddress#getHostName() +java.net.InetAddress#getCanonicalHostName() + +java.net.InetSocketAddress#getHostName() @ Use getHostString() instead, which avoids a DNS lookup diff --git a/distribution/src/main/resources/config/elasticsearch.yml b/distribution/src/main/resources/config/elasticsearch.yml index b3baf765b3a..f55f210ebb7 100644 --- a/distribution/src/main/resources/config/elasticsearch.yml +++ b/distribution/src/main/resources/config/elasticsearch.yml @@ -71,13 +71,10 @@ # # --------------------------------- Discovery ---------------------------------- # -# Elasticsearch nodes will find each other via multicast, by default. -# -# To use the unicast discovery, disable the multicast discovery: -# -# discovery.zen.ping.multicast.enabled: false +# Elasticsearch nodes will find each other via unicast, by default. # # Pass an initial list of hosts to perform discovery when new node is started: +# The default list of hosts is ["127.0.0.1", "[::1]"] # # discovery.zen.ping.unicast.hosts: ["host1", "host2"] # @@ -85,6 +82,10 @@ # # discovery.zen.minimum_master_nodes: 3 # +# To use multicast for discovery, enable it: +# +# discovery.zen.ping.multicast.enabled: true +# # For more information, see the documentation at: # # diff --git a/docs/reference/getting-started.asciidoc b/docs/reference/getting-started.asciidoc index b1d40eaa4c3..4d9578624c1 100755 --- a/docs/reference/getting-started.asciidoc +++ b/docs/reference/getting-started.asciidoc @@ -203,7 +203,7 @@ We can see that our cluster named "elasticsearch" is up with a green status. Whenever we ask for the cluster health, we either get green, yellow, or red. Green means everything is good (cluster is fully functional), yellow means all data is available but some replicas are not yet allocated (cluster is fully functional), and red means some data is not available for whatever reason. Note that even if a cluster is red, it still is partially functional (i.e. it will continue to serve search requests from the available shards) but you will likely need to fix it ASAP since you have missing data. -Also from the above response, we can see and total of 1 node and that we have 0 shards since we have no data in it yet. Note that since we are using the default cluster name (elasticsearch) and since Elasticsearch uses multicast network discovery by default to find other nodes, it is possible that you could accidentally start up more than one node in your network and have them all join a single cluster. In this scenario, you may see more than 1 node in the above response. +Also from the above response, we can see and total of 1 node and that we have 0 shards since we have no data in it yet. Note that since we are using the default cluster name (elasticsearch) and since Elasticsearch uses unicast network discovery by default to find other nodes on the same machine, it is possible that you could accidentally start up more than one node on your computer and have them all join a single cluster. In this scenario, you may see more than 1 node in the above response. We can also get a list of nodes in our cluster as follows: diff --git a/docs/reference/modules/discovery/zen.asciidoc b/docs/reference/modules/discovery/zen.asciidoc index 8f0bd1f1c50..9eb835eb5ae 100644 --- a/docs/reference/modules/discovery/zen.asciidoc +++ b/docs/reference/modules/discovery/zen.asciidoc @@ -2,7 +2,7 @@ === Zen Discovery The zen discovery is the built in discovery module for elasticsearch and -the default. It provides both multicast and unicast discovery as well +the default. It provides both unicast and multicast discovery as well being easily extended to support cloud environments. The zen discovery is integrated with other modules, for example, all @@ -40,7 +40,7 @@ respond to. It provides the following settings with the |`address` |The address to bind to, defaults to `null` which means it will bind `network.bind_host` -|`enabled` |Whether multicast ping discovery is enabled. Defaults to `true`. +|`enabled` |Whether multicast ping discovery is enabled. Defaults to `false`. |======================================================================= [float] @@ -57,7 +57,8 @@ as gossip routers. It provides the following settings with the |Setting |Description |`hosts` |Either an array setting or a comma delimited setting. Each value is either in the form of `host:port`, or in the form of -`host[port1-port2]`. +`host:port1-port2`. Note that IPv6 hosts must be bracketed. Defaults to +`127.0.0.1, [::1]` |======================================================================= The unicast discovery uses the diff --git a/plugins/cloud-aws/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java b/plugins/cloud-aws/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java index 4ffd73c2158..ab115410c17 100644 --- a/plugins/cloud-aws/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java +++ b/plugins/cloud-aws/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java @@ -156,9 +156,9 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni } if (address != null) { try { - TransportAddress[] addresses = transportService.addressesFromString(address); - // we only limit to 1 addresses, makes no sense to ping 100 ports - for (int i = 0; (i < addresses.length && i < UnicastZenPing.LIMIT_PORTS_COUNT); i++) { + // we only limit to 1 port per address, makes no sense to ping 100 ports + TransportAddress[] addresses = transportService.addressesFromString(address, 1); + for (int i = 0; i < addresses.length; i++) { logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]); discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i], version.minimumCompatibilityVersion())); } diff --git a/plugins/cloud-azure/src/main/java/org/elasticsearch/discovery/azure/AzureUnicastHostsProvider.java b/plugins/cloud-azure/src/main/java/org/elasticsearch/discovery/azure/AzureUnicastHostsProvider.java index bfd6aacbae3..4b20dfe86e1 100644 --- a/plugins/cloud-azure/src/main/java/org/elasticsearch/discovery/azure/AzureUnicastHostsProvider.java +++ b/plugins/cloud-azure/src/main/java/org/elasticsearch/discovery/azure/AzureUnicastHostsProvider.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.azure; import com.microsoft.windowsazure.management.compute.models.*; + import org.elasticsearch.Version; import org.elasticsearch.cloud.azure.AzureServiceDisableException; import org.elasticsearch.cloud.azure.AzureServiceRemoteException; @@ -28,6 +29,7 @@ import org.elasticsearch.cloud.azure.management.AzureComputeService.Discovery; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -37,6 +39,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Locale; import java.util.List; @@ -216,9 +219,9 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic if (privateIp != null) { if (privateIp.equals(ipAddress)) { - logger.trace("adding ourselves {}", ipAddress); + logger.trace("adding ourselves {}", NetworkAddress.format(ipAddress)); } - networkAddress = privateIp.getHostAddress(); + networkAddress = NetworkAddress.formatAddress(privateIp); } else { logger.trace("no private ip provided. ignoring [{}]...", instance.getInstanceName()); } @@ -231,7 +234,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic continue; } - networkAddress = endpoint.getVirtualIPAddress().getHostAddress() + ":" + endpoint.getPort(); + networkAddress = NetworkAddress.formatAddress(new InetSocketAddress(endpoint.getVirtualIPAddress(), endpoint.getPort())); } if (networkAddress == null) { @@ -251,11 +254,13 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic } try { - TransportAddress[] addresses = transportService.addressesFromString(networkAddress); - // we only limit to 1 addresses, makes no sense to ping 100 ports - logger.trace("adding {}, transport_address {}", networkAddress, addresses[0]); - cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), addresses[0], + // we only limit to 1 port per address, makes no sense to ping 100 ports + TransportAddress[] addresses = transportService.addressesFromString(networkAddress, 1); + for (TransportAddress address : addresses) { + logger.trace("adding {}, transport_address {}", networkAddress, address); + cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), address, version.minimumCompatibilityVersion())); + } } catch (Exception e) { logger.warn("can not convert [{}] to transport address. skipping. [{}]", networkAddress, e.getMessage()); } diff --git a/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java b/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java index cf02dff0464..5415f583607 100644 --- a/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java +++ b/plugins/cloud-gce/src/main/java/org/elasticsearch/discovery/gce/GceUnicastHostsProvider.java @@ -22,12 +22,14 @@ package org.elasticsearch.discovery.gce; import com.google.api.services.compute.model.AccessConfig; import com.google.api.services.compute.model.Instance; import com.google.api.services.compute.model.NetworkInterface; + import org.elasticsearch.Version; import org.elasticsearch.cloud.gce.GceComputeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -114,7 +116,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas try { InetAddress inetAddress = networkService.resolvePublishHostAddress(null); if (inetAddress != null) { - ipAddress = inetAddress.getHostAddress(); + ipAddress = NetworkAddress.formatAddress(inetAddress); } } catch (IOException e) { // We can't find the publish host address... Hmmm. Too bad :-( @@ -224,13 +226,15 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas } // ip_private is a single IP Address. We need to build a TransportAddress from it - TransportAddress[] addresses = transportService.addressesFromString(address); - // If user has set `es_port` metadata, we don't need to ping all ports // we only limit to 1 addresses, makes no sense to ping 100 ports - logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type, - ip_private, addresses[0], status); - cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + name + "-" + 0, addresses[0], version.minimumCompatibilityVersion())); + TransportAddress[] addresses = transportService.addressesFromString(address, 1); + + for (TransportAddress transportAddress : addresses) { + logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type, + ip_private, transportAddress, status); + cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + name + "-" + 0, transportAddress, version.minimumCompatibilityVersion())); + } } } catch (Exception e) { logger.warn("failed to add {}, address {}", e, name, ip_private); diff --git a/plugins/site-example/src/test/java/org/elasticsearch/example/SiteContentsIT.java b/plugins/site-example/src/test/java/org/elasticsearch/example/SiteContentsIT.java index b873e2909b3..c92a0ba719e 100644 --- a/plugins/site-example/src/test/java/org/elasticsearch/example/SiteContentsIT.java +++ b/plugins/site-example/src/test/java/org/elasticsearch/example/SiteContentsIT.java @@ -22,6 +22,7 @@ package org.elasticsearch.example; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ExternalTestCluster; import org.elasticsearch.test.TestCluster; @@ -46,7 +47,7 @@ public class SiteContentsIT extends ESIntegTestCase { for (InetSocketAddress address : externalCluster.httpAddresses()) { RestResponse restResponse = new RestResponse( new HttpRequestBuilder(httpClient) - .host(address.getHostName()).port(address.getPort()) + .host(NetworkAddress.formatAddress(address.getAddress())).port(address.getPort()) .path("/_plugin/site-example/") .method("GET").execute()); assertEquals(200, restResponse.getStatusCode()); diff --git a/pom.xml b/pom.xml index d82b7cab6f9..13bda58932d 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ +