Default to unicast discovery, with default host list of 127.0.0.1, [::1]

Fix unicast discovery to work when a host has multiple addresses.
Ban dangerous methods in java.net with forbidden APIs.
Fix ipv6 bugs and formatting of network addresses everywhere.

Closes #12999
Closes #12993

Squashed commit of the following:

commit 6c1aa001d091c5cf25212a53dc701fb704337f1e
Author: Robert Muir <rmuir@apache.org>
Date:   Thu Aug 20 14:25:43 2015 -0400

    Fix these to be correct with addresses just in case

commit 648215627e84abf58a71400e7dc9ae775efb71d6
Merge: d00561b 41d8fbe
Author: Robert Muir <rmuir@apache.org>
Date:   Thu Aug 20 13:23:09 2015 -0400

    Merge branch 'master' into unicast_all_the_way_down

commit d00561b76fd1aa5850699f7901f3dae3d4d402b7
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Aug 20 16:38:50 2015 +0200

    limit local ports to 5 in UnicastZenPing

commit e2e15c594006746cbe24432694294a71cc99deb8
Author: Robert Muir <rmuir@apache.org>
Date:   Thu Aug 20 10:32:47 2015 -0400

    fix port limiting

commit 10153cb7adadda81a1f482445e703836b65cf5e2
Author: Robert Muir <rmuir@apache.org>
Date:   Thu Aug 20 10:18:37 2015 -0400

    don't serialize scopeids: that's broken

commit 2aa63d43db2baec68a2e9bc227cfeb85dfeb4f83
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Aug 20 16:06:51 2015 +0200

    restore @Network

commit c840f1d1ef438826ae1ecfd5e45942a0e30dc9c0
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Aug 20 16:02:30 2015 +0200

     Use NetworkAddress.formatAddress where applicable in plugins

commit 374ce878852b35d626b7a29c8c4773545b0e9ddd
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Aug 20 15:34:06 2015 +0200

    Use NetworkAddress.formatAddress where applicable

commit e7a606d63f1bc43c1b62b6e17adf707c76d43a15
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Aug 20 10:17:57 2015 +0200

    Add @Multicast annotation to disable multicast tests by default.

    We only run multicast tests now when we explicitly state it. A working
    multicast env is required which is not always the case.

commit 2d7d2d0347179696ab41f71f048b13305014c85b
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Aug 20 09:51:28 2015 +0200

    Remove extra check for local mode in InternalTestCluster

commit dda59ac39aa136d4687b9274c2692cd77f8b8f66
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Aug 20 09:37:03 2015 +0200

    Handle node mode across entire test cluster

    We used static methods reading sys properties to define the node mode
    per cluster. this had lots of problems when tests couldn't cope with
    mixed or only local mode. Now we are passing it down to the cluster from the test
    which allows to @SuppressNetworkMode / @SupressLocalMode on the test to force
    consistent node configurations.

commit 058197b7a408318995c88ce7f6762e32348de0de
Author: Robert Muir <rmuir@apache.org>
Date:   Thu Aug 20 03:19:14 2015 -0400

    really ban InetSocketAddress's trappy method and break build and go to sleep, sorry

commit ac8779185aee1e17e6f5a81766290fdfc9c603ba
Author: Robert Muir <rmuir@apache.org>
Date:   Thu Aug 20 03:16:52 2015 -0400

    Ban methods that might surprisingly cause DNS lookups

commit e64fe3dff2b11503e5f2831eb9863d64f56c5538
Author: Robert Muir <rmuir@apache.org>
Date:   Thu Aug 20 02:59:05 2015 -0400

    Add unit test

commit f15434f20fb1a3691b1cc16028597d8fae937e05
Author: Robert Muir <rmuir@apache.org>
Date:   Thu Aug 20 02:39:02 2015 -0400

    fix ipv6 formatting bugs

commit 05c2c74098052c75fbb79ea1818a295ef2e03e30
Author: Robert Muir <rmuir@apache.org>
Date:   Thu Aug 20 02:12:05 2015 -0400

    format addresses correctly so I can actually read what comes out of our logs and stats apis

commit 4f9389dcf1e8925f23153c5eb271b4ce2294dbaf
Author: Robert Muir <rmuir@apache.org>
Date:   Wed Aug 19 21:26:52 2015 -0400

    ban dangerous methods in java.net

commit 6aacd4d9925f324903d1d099a6cf5f862aeaf677
Author: Robert Muir <rmuir@apache.org>
Date:   Wed Aug 19 20:59:24 2015 -0400

    ban lenient method

commit f466a842c60163d1f4554bdce8a4163edb534c2c
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Aug 20 00:29:00 2015 +0200

    fix tests to not mix local transport and zen unicast disco

commit 0de007a33b33fb68cf85cd86db4ca4f8ce10bbc9
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Aug 20 00:10:07 2015 +0200

    fix tests to not mix local transport and zen unicast disco

commit 539f6ca6e5137e0d496239adc8684688dedcc824
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Aug 20 00:02:01 2015 +0200

    fix tests to not mix local transport and zen unicast disco

commit 004c2881b25467f332acc8c9f9e92b1f0f9d314e
Author: Robert Muir <rmuir@apache.org>
Date:   Wed Aug 19 17:51:45 2015 -0400

    Fix multinode

commit 54113af325ce31571811c49fdaae89d5687be4ba
Author: Robert Muir <rmuir@apache.org>
Date:   Wed Aug 19 17:36:45 2015 -0400

    fix integration tests

commit 0156a77a56319d6b9737ec6a531992052e50bd59
Author: Simon Willnauer <simonw@apache.org>
Date:   Wed Aug 19 23:32:18 2015 +0200

    enable multicast in MulticastZenPingIT.java

commit 1791caa35da853ce0122485fa3fd4674c671ec6e
Author: Robert Muir <rmuir@apache.org>
Date:   Wed Aug 19 17:23:16 2015 -0400

    Fix constant

commit 22820b53e0b2dc9fd47145c2bc29ce912a8fd484
Author: Simon Willnauer <simonw@apache.org>
Date:   Wed Aug 19 22:59:09 2015 +0200

    give it some extra ids for local transport crazyness

commit b2138fafa94a8a085813fd48356df63e57ade5b3
Author: Simon Willnauer <simonw@apache.org>
Date:   Wed Aug 19 22:51:42 2015 +0200

    pass on local addresses from configured transport rather than hard code IP addresses

commit 1bf5de1f457b081e0ce262b57d2b55d39c434156
Author: Simon Willnauer <simonw@apache.org>
Date:   Wed Aug 19 22:04:31 2015 +0200

    fix PluggableTransportModuleIT.java to use local disco and detach port limit for node local disco

commit b6706eddfa04c43947c16551359ae98a463d34aa
Author: Robert Muir <rmuir@apache.org>
Date:   Wed Aug 19 14:16:03 2015 -0400

    Default to unicast discovery, with default host list of 127.0.0.1, [::1]
This commit is contained in:
Robert Muir 2015-08-20 14:26:40 -04:00
parent 41d8fbe8f5
commit e2ab62596f
64 changed files with 898 additions and 252 deletions

View File

@ -360,16 +360,16 @@ public class DiscoveryNode implements Streamable, ToXContent {
public String toString() {
StringBuilder sb = new StringBuilder();
if (nodeName.length() > 0) {
sb.append('[').append(nodeName).append(']');
sb.append('{').append(nodeName).append('}');
}
if (nodeId != null) {
sb.append('[').append(nodeId).append(']');
sb.append('{').append(nodeId).append('}');
}
if (Strings.hasLength(hostName)) {
sb.append('[').append(hostName).append(']');
sb.append('{').append(hostName).append('}');
}
if (address != null) {
sb.append('[').append(address).append(']');
sb.append('{').append(address).append('}');
}
if (!attributes.isEmpty()) {
sb.append(attributes);

View File

@ -84,6 +84,7 @@ public class Loggers {
}
}
@SuppressForbidden(reason = "do not know what this method does")
public static ESLogger getLogger(String loggerName, Settings settings, String... prefixes) {
List<String> prefixesList = newArrayList();
if (settings.getAsBoolean("logger.logHostAddress", false)) {

View File

@ -114,22 +114,22 @@ final class IfConfig {
InetAddress address = interfaceAddress.getAddress();
if (address instanceof Inet6Address) {
sb.append("inet6 ");
sb.append(address.toString().substring(1));
sb.append(NetworkAddress.formatAddress(address));
sb.append(" prefixlen:");
sb.append(interfaceAddress.getNetworkPrefixLength());
} else {
sb.append("inet ");
sb.append(address.toString().substring(1));
sb.append(NetworkAddress.formatAddress(address));
int netmask = 0xFFFFFFFF << (32 - interfaceAddress.getNetworkPrefixLength());
sb.append(" netmask:" + InetAddress.getByAddress(new byte[] {
sb.append(" netmask:" + NetworkAddress.formatAddress(InetAddress.getByAddress(new byte[] {
(byte)(netmask >>> 24),
(byte)(netmask >>> 16 & 0xFF),
(byte)(netmask >>> 8 & 0xFF),
(byte)(netmask & 0xFF)
}).toString().substring(1));
})));
InetAddress broadcast = interfaceAddress.getBroadcast();
if (broadcast != null) {
sb.append(" broadcast:" + broadcast.toString().substring(1));
sb.append(" broadcast:" + NetworkAddress.formatAddress(broadcast));
}
}
if (address.isLoopbackAddress()) {

View File

@ -20,7 +20,9 @@
package org.elasticsearch.common.network;
import com.google.common.collect.Maps;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.ESLogger;
@ -257,6 +259,7 @@ public abstract class MulticastChannel implements Closeable {
/**
* Simple implementation of a channel.
*/
@SuppressForbidden(reason = "I bind to wildcard addresses. I am a total nightmare")
private static class Plain extends MulticastChannel {
private final ESLogger logger;
private final Config config;

View File

@ -0,0 +1,183 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.network;
import com.google.common.net.InetAddresses;
import org.elasticsearch.common.SuppressForbidden;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Objects;
/**
* Utility functions for presentation of network addresses.
* <p>
* Java's address formatting is particularly bad, every address
* has an optional host if its resolved, so IPv4 addresses often
* look like this (note the confusing leading slash):
* <pre>
* {@code /127.0.0.1}
* </pre>
* IPv6 addresses are even worse, with no IPv6 address compression,
* and often containing things like numeric scopeids, which are even
* more confusing (e.g. not going to work in any user's browser, refer
* to an interface on <b>another</b> machine, etc):
* <pre>
* {@code /0:0:0:0:0:0:0:1%1}
* </pre>
* This class provides sane address formatting instead, e.g.
* {@code 127.0.0.1} and {@code ::1} respectively. No methods do reverse
* lookups.
*/
public final class NetworkAddress {
/** No instantiation */
private NetworkAddress() {}
/**
* Formats a network address (with optional host) for display purposes.
* <p>
* If the host is already resolved (typically because, we looked up
* a name to do that), then we include it, otherwise it is
* omitted. See {@link #formatAddress(InetAddress)} if you only
* want the address.
* <p>
* IPv6 addresses are compressed and without scope
* identifiers.
* <p>
* Example output with already-resolved hostnames:
* <ul>
* <li>IPv4: {@code localhost/127.0.0.1}</li>
* <li>IPv6: {@code localhost/::1}</li>
* </ul>
* <p>
* Example output with just an address:
* <ul>
* <li>IPv4: {@code 127.0.0.1}</li>
* <li>IPv6: {@code ::1}</li>
* </ul>
* @param address IPv4 or IPv6 address
* @return formatted string
* @see #formatAddress(InetAddress)
*/
public static String format(InetAddress address) {
return format(address, -1, true);
}
/**
* Formats a network address and port for display purposes.
* <p>
* If the host is already resolved (typically because, we looked up
* a name to do that), then we include it, otherwise it is
* omitted. See {@link #formatAddress(InetSocketAddress)} if you only
* want the address.
* <p>
* This formats the address with {@link #format(InetAddress)}
* and appends the port number. IPv6 addresses will be bracketed.
* <p>
* Example output with already-resolved hostnames:
* <ul>
* <li>IPv4: {@code localhost/127.0.0.1:9300}</li>
* <li>IPv6: {@code localhost/[::1]:9300}</li>
* </ul>
* <p>
* Example output with just an address:
* <ul>
* <li>IPv4: {@code 127.0.0.1:9300}</li>
* <li>IPv6: {@code [::1]:9300}</li>
* </ul>
* @param address IPv4 or IPv6 address with port
* @return formatted string
* @see #formatAddress(InetSocketAddress)
*/
public static String format(InetSocketAddress address) {
return format(address.getAddress(), address.getPort(), true);
}
/**
* Formats a network address for display purposes.
* <p>
* This formats only the address, any hostname information,
* if present, is ignored. IPv6 addresses are compressed
* and without scope identifiers.
* <p>
* Example output with just an address:
* <ul>
* <li>IPv4: {@code 127.0.0.1}</li>
* <li>IPv6: {@code ::1}</li>
* </ul>
* @param address IPv4 or IPv6 address
* @return formatted string
*/
public static String formatAddress(InetAddress address) {
return format(address, -1, false);
}
/**
* Formats a network address and port for display purposes.
* <p>
* This formats the address with {@link #formatAddress(InetAddress)}
* and appends the port number. IPv6 addresses will be bracketed.
* Any host information, if present is ignored.
* <p>
* Example output:
* <ul>
* <li>IPv4: {@code 127.0.0.1:9300}</li>
* <li>IPv6: {@code [::1]:9300}</li>
* </ul>
* @param address IPv4 or IPv6 address with port
* @return formatted string
*/
public static String formatAddress(InetSocketAddress address) {
return format(address.getAddress(), address.getPort(), false);
}
// note, we don't validate port, because we only allow InetSocketAddress
@SuppressForbidden(reason = "we call toString to avoid a DNS lookup")
static String format(InetAddress address, int port, boolean includeHost) {
Objects.requireNonNull(address);
StringBuilder builder = new StringBuilder();
if (includeHost) {
// must use toString, to avoid DNS lookup. but the format is specified in the spec
String toString = address.toString();
int separator = toString.indexOf('/');
if (separator > 0) {
// append hostname, with the slash too
builder.append(toString, 0, separator + 1);
}
}
if (port != -1 && address instanceof Inet6Address) {
builder.append(InetAddresses.toUriString(address));
} else {
builder.append(InetAddresses.toAddrString(address));
}
if (port != -1) {
builder.append(':');
builder.append(port);
}
return builder.toString();
}
}

View File

@ -19,8 +19,10 @@
package org.elasticsearch.common.transport;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.network.NetworkAddress;
import java.io.IOException;
import java.net.Inet6Address;
@ -32,6 +34,7 @@ import java.net.InetSocketAddress;
*/
public final class InetSocketTransportAddress implements TransportAddress {
// TODO: do we really need this option, why do resolving? - remove this as a follow-up
private static boolean resolveAddress = false;
public static void setResolveAddress(boolean resolveAddress) {
@ -53,15 +56,14 @@ public final class InetSocketTransportAddress implements TransportAddress {
in.readFully(a);
InetAddress inetAddress;
if (len == 16) {
int scope_id = in.readInt();
inetAddress = Inet6Address.getByAddress(null, a, scope_id);
inetAddress = Inet6Address.getByAddress(null, a);
} else {
inetAddress = InetAddress.getByAddress(a);
}
int port = in.readInt();
this.address = new InetSocketAddress(inetAddress, port);
} else {
this.address = new InetSocketAddress(in.readString(), in.readInt());
this.address = new InetSocketAddress(InetAddress.getByName(in.readString()), in.readInt());
}
}
@ -69,10 +71,6 @@ public final class InetSocketTransportAddress implements TransportAddress {
address = null;
}
public InetSocketTransportAddress(String hostname, int port) {
this(new InetSocketAddress(hostname, port));
}
public InetSocketTransportAddress(InetAddress address, int port) {
this(new InetSocketAddress(address, port));
}
@ -94,16 +92,12 @@ public final class InetSocketTransportAddress implements TransportAddress {
@Override
public String getHost() {
if (resolveAddress) {
return address.getHostName();
} else {
return getAddress();
}
return maybeLookupHostname();
}
@Override
public String getAddress() {
return address.getAddress().getHostAddress();
return NetworkAddress.formatAddress(address.getAddress());
}
@Override
@ -127,15 +121,25 @@ public final class InetSocketTransportAddress implements TransportAddress {
byte[] bytes = address().getAddress().getAddress(); // 4 bytes (IPv4) or 16 bytes (IPv6)
out.writeByte((byte) bytes.length); // 1 byte
out.write(bytes, 0, bytes.length);
if (address().getAddress() instanceof Inet6Address)
out.writeInt(((Inet6Address) address.getAddress()).getScopeId());
// don't serialize scope ids over the network!!!!
// these only make sense with respect to the local machine, and will only formulate
// the address incorrectly remotely.
} else {
out.writeByte((byte) 1);
out.writeString(address.getHostName());
out.writeString(maybeLookupHostname());
}
out.writeInt(address.getPort());
}
@SuppressForbidden(reason = "if explicitly configured we do hostName reverse lookup") // TODO remove this?
private String maybeLookupHostname() {
if (resolveAddress) {
return address.getHostName();
} else {
return getAddress();
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -151,6 +155,6 @@ public final class InetSocketTransportAddress implements TransportAddress {
@Override
public String toString() {
return "inet[" + address + "]";
return NetworkAddress.format(address);
}
}

View File

@ -53,10 +53,10 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
Version version, ElectMasterService electMasterService, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
ImmutableList.Builder<ZenPing> zenPingsBuilder = ImmutableList.builder();
if (this.settings.getAsBoolean("discovery.zen.ping.multicast.enabled", true)) {
if (this.settings.getAsBoolean("discovery.zen.ping.multicast.enabled", false)) {
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService, version));
}
// always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast
// always add the unicast hosts, or things get angry!
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, version, electMasterService, unicastHostsProviders));
this.zenPings = zenPingsBuilder.build();

View File

@ -64,7 +64,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
public static final String ACTION_NAME = "internal:discovery/zen/unicast";
public static final int LIMIT_PORTS_COUNT = 1;
// these limits are per-address
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
public static final int LIMIT_LOCAL_PORTS_COUNT = 5;
private final ThreadPool threadPool;
private final TransportService transportService;
@ -117,15 +120,24 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
hostArr[i] = hostArr[i].trim();
}
List<String> hosts = Lists.newArrayList(hostArr);
final int limitPortCounts;
if (hosts.isEmpty()) {
// if unicast hosts are not specified, fill with simple defaults on the local machine
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
hosts.addAll(transportService.getLocalAddresses());
} else {
// we only limit to 1 addresses, makes no sense to ping 100 ports
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
}
logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects);
List<DiscoveryNode> configuredTargetNodes = Lists.newArrayList();
for (String host : hosts) {
try {
TransportAddress[] addresses = transportService.addressesFromString(host);
// we only limit to 1 addresses, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < LIMIT_PORTS_COUNT); i++) {
configuredTargetNodes.add(new DiscoveryNode(UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", addresses[i], version.minimumCompatibilityVersion()));
TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts);
for (TransportAddress address : addresses) {
configuredTargetNodes.add(new DiscoveryNode(UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", address, version.minimumCompatibilityVersion()));
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e);

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
@ -274,7 +275,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
private void bindAddress(final InetAddress hostAddress) {
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<SocketAddress> boundSocket = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = portsRange.iterate(new PortsRange.PortCallback() {
@Override
public boolean onPortNumber(int portNumber) {
@ -282,7 +283,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
synchronized (serverChannels) {
Channel channel = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
serverChannels.add(channel);
boundSocket.set(channel.getLocalAddress());
boundSocket.set((InetSocketAddress) channel.getLocalAddress());
}
} catch (Exception e) {
lastException.set(e);
@ -294,7 +295,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
if (!success) {
throw new BindHttpException("Failed to bind to [" + port + "]", lastException.get());
}
logger.info("Bound http to address [{}]", boundSocket.get());
logger.info("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
@Override

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -32,6 +33,7 @@ import java.util.Map;
*/
public interface Transport extends LifecycleComponent<Transport> {
public static class TransportSettings {
public static final String TRANSPORT_TCP_COMPRESS = "transport.tcp.compress";
}
@ -52,7 +54,7 @@ public interface Transport extends LifecycleComponent<Transport> {
/**
* Returns an address from its string representation.
*/
TransportAddress[] addressesFromString(String address) throws Exception;
TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception;
/**
* Is the address type supported.
@ -89,4 +91,6 @@ public interface Transport extends LifecycleComponent<Transport> {
* Returns count of currently open connections
*/
long serverOpen();
List<String> getLocalAddresses();
}

View File

@ -92,7 +92,6 @@ public class TransportModule extends AbstractModule {
}
bind(NamedWriteableRegistry.class).asEagerSingleton();
if (configuredTransport != null) {
logger.info("Using [{}] as transport, overridden by [{}]", configuredTransport.getName(), configuredTransportSource);
bind(Transport.class).to(configuredTransport).asEagerSingleton();

View File

@ -40,10 +40,7 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@ -221,6 +218,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return transport.boundAddress();
}
public List<String> getLocalAddresses() {
return transport.getLocalAddresses();
}
public boolean nodeConnected(DiscoveryNode node) {
return node.equals(localNode) || transport.nodeConnected(node);
}
@ -383,8 +384,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return requestIds.getAndIncrement();
}
public TransportAddress[] addressesFromString(String address) throws Exception {
return transport.addressesFromString(address);
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
return transport.addressesFromString(address, perAddressLimit);
}
/**

View File

@ -41,8 +41,7 @@ import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@ -57,14 +56,13 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
public class LocalTransport extends AbstractLifecycleComponent<Transport> implements Transport {
public static final String LOCAL_TRANSPORT_THREAD_NAME_PREFIX = "local_transport";
private final ThreadPool threadPool;
private final ThreadPoolExecutor workers;
private final Version version;
private volatile TransportServiceAdapter transportServiceAdapter;
private volatile BoundTransportAddress boundAddress;
private volatile LocalTransportAddress localAddress;
private final static ConcurrentMap<TransportAddress, LocalTransport> transports = newConcurrentMap();
private final static ConcurrentMap<LocalTransportAddress, LocalTransport> transports = newConcurrentMap();
private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap();
private final NamedWriteableRegistry namedWriteableRegistry;
@ -78,7 +76,6 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
super(settings);
this.threadPool = threadPool;
this.version = version;
int workerCount = this.settings.getAsInt(TRANSPORT_LOCAL_WORKERS, EsExecutors.boundedNumberOfProcessors(settings));
int queueSize = this.settings.getAsInt(TRANSPORT_LOCAL_QUEUE, -1);
logger.debug("creating [{}] workers, queue_size [{}]", workerCount, queueSize);
@ -88,7 +85,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
@Override
public TransportAddress[] addressesFromString(String address) {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) {
return new TransportAddress[]{new LocalTransportAddress(address)};
}
@ -359,4 +356,9 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
logger.error("failed to handle exception response [{}]", t, handler);
}
}
@Override
public List<String> getLocalAddresses() {
return Collections.singletonList("0.0.0.0");
}
}

View File

@ -22,8 +22,8 @@ package org.elasticsearch.transport.netty;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -42,6 +42,7 @@ import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
@ -75,6 +76,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.CancelledKeyException;
import java.util.*;
import java.util.concurrent.*;
@ -82,6 +84,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -118,6 +122,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
public static final String DEFAULT_PORT_RANGE = "9300-9400";
public static final String DEFAULT_PROFILE = "default";
private static final List<String> LOCAL_ADDRESSES = Arrays.asList("127.0.0.1", "[::1]");
protected final NetworkService networkService;
protected final Version version;
@ -405,7 +410,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e);
}
if (logger.isDebugEnabled()) {
logger.debug("binding server bootstrap to: {}", hostAddresses);
String[] addresses = new String[hostAddresses.length];
for (int i = 0; i < hostAddresses.length; i++) {
addresses[i] = NetworkAddress.format(hostAddresses[i]);
}
logger.debug("binding server bootstrap to: {}", addresses);
}
for (InetAddress hostAddress : hostAddresses) {
bindServerBootstrap(name, hostAddress, settings);
@ -417,7 +426,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
String port = settings.get("port");
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<SocketAddress> boundSocket = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = portsRange.iterate(new PortsRange.PortCallback() {
@Override
public boolean onPortNumber(int portNumber) {
@ -430,7 +439,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
serverChannels.put(name, list);
}
list.add(channel);
boundSocket.set(channel.getLocalAddress());
boundSocket.set((InetSocketAddress)channel.getLocalAddress());
}
} catch (Exception e) {
lastException.set(e);
@ -444,7 +453,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
if (!DEFAULT_PROFILE.equals(name)) {
InetSocketAddress boundAddress = (InetSocketAddress) boundSocket.get();
InetSocketAddress boundAddress = boundSocket.get();
int publishPort = settings.getAsInt("publish_port", boundAddress.getPort());
String publishHost = settings.get("publish_host", boundAddress.getHostString());
InetSocketAddress publishAddress = createPublishAddress(publishHost, publishPort);
@ -452,7 +461,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
profileBoundAddresses.putIfAbsent(name, new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)));
}
logger.info("Bound profile [{}] to address [{}]", name, boundSocket.get());
logger.info("Bound profile [{}] to address {{}}", name, NetworkAddress.format(boundSocket.get()));
}
private void createServerBootstrap(String name, Settings settings) {
@ -582,35 +591,65 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
@Override
public TransportAddress[] addressesFromString(String address) throws Exception {
int index = address.indexOf('[');
if (index != -1) {
String host = address.substring(0, index);
Set<String> ports = Strings.commaDelimitedListToSet(address.substring(index + 1, address.indexOf(']')));
List<TransportAddress> addresses = Lists.newArrayList();
for (String port : ports) {
int[] iPorts = new PortsRange(port).ports();
for (int iPort : iPorts) {
addresses.add(new InetSocketTransportAddress(host, iPort));
}
}
return addresses.toArray(new TransportAddress[addresses.size()]);
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
return parse(address, settings.get("transport.profiles.default.port",
settings.get("transport.netty.port",
settings.get("transport.tcp.port",
DEFAULT_PORT_RANGE))), perAddressLimit);
}
// this code is a take on guava's HostAndPort, like a HostAndPortRange
// pattern for validating ipv6 bracked addresses.
// not perfect, but PortsRange should take care of any port range validation, not a regex
private static final Pattern BRACKET_PATTERN = Pattern.compile("^\\[(.*:.*)\\](?::([\\d\\-]*))?$");
/** parse a hostname+port range spec into its equivalent addresses */
static TransportAddress[] parse(String hostPortString, String defaultPortRange, int perAddressLimit) throws UnknownHostException {
Objects.requireNonNull(hostPortString);
String host;
String portString = null;
if (hostPortString.startsWith("[")) {
// Parse a bracketed host, typically an IPv6 literal.
Matcher matcher = BRACKET_PATTERN.matcher(hostPortString);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid bracketed host/port range: " + hostPortString);
}
host = matcher.group(1);
portString = matcher.group(2); // could be null
} else {
index = address.lastIndexOf(':');
if (index == -1) {
List<TransportAddress> addresses = Lists.newArrayList();
String defaultPort = settings.get("transport.profiles.default.port", settings.get("transport.netty.port", this.settings.get("transport.tcp.port", DEFAULT_PORT_RANGE)));
int[] iPorts = new PortsRange(defaultPort).ports();
for (int iPort : iPorts) {
addresses.add(new InetSocketTransportAddress(address, iPort));
}
return addresses.toArray(new TransportAddress[addresses.size()]);
} else {
String host = address.substring(0, index);
int port = Integer.parseInt(address.substring(index + 1));
return new TransportAddress[]{new InetSocketTransportAddress(host, port)};
int colonPos = hostPortString.indexOf(':');
if (colonPos >= 0 && hostPortString.indexOf(':', colonPos + 1) == -1) {
// Exactly 1 colon. Split into host:port.
host = hostPortString.substring(0, colonPos);
portString = hostPortString.substring(colonPos + 1);
} else {
// 0 or 2+ colons. Bare hostname or IPv6 literal.
host = hostPortString;
// 2+ colons and not bracketed: exception
if (colonPos >= 0) {
throw new IllegalArgumentException("IPv6 addresses must be bracketed: " + hostPortString);
}
}
}
// if port isn't specified, fill with the default
if (portString == null || portString.isEmpty()) {
portString = defaultPortRange;
}
// generate address for each port in the range
Set<InetAddress> addresses = new HashSet<>(Arrays.asList(InetAddress.getAllByName(host)));
List<TransportAddress> transportAddresses = new ArrayList<>();
int[] ports = new PortsRange(portString).ports();
int limit = Math.min(ports.length, perAddressLimit);
for (int i = 0; i < limit; i++) {
for (InetAddress address : addresses) {
transportAddresses.add(new InetSocketTransportAddress(address, ports[i]));
}
}
return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]);
}
@Override
@ -673,6 +712,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return channels == null ? 0 : channels.numberOfOpenChannels();
}
@Override
public List<String> getLocalAddresses() {
return LOCAL_ADDRESSES;
}
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {

View File

@ -35,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.netty.NettyTransport;
import java.net.InetAddress;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.transport.TransportRequestOptions.options;
@ -44,7 +45,7 @@ import static org.elasticsearch.transport.TransportRequestOptions.options;
*/
public class BenchmarkNettyLargeMessages {
public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) throws Exception {
final ByteSizeValue payloadSize = new ByteSizeValue(10, ByteSizeUnit.MB);
final int NUMBER_OF_ITERATIONS = 100000;
final int NUMBER_OF_CLIENTS = 5;
@ -63,7 +64,7 @@ public class BenchmarkNettyLargeMessages {
new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()), threadPool
).start();
final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300), Version.CURRENT);
final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300), Version.CURRENT);
// final DiscoveryNode smallNode = new DiscoveryNode("small", new InetSocketTransportAddress("localhost", 9300));
final DiscoveryNode smallNode = bigNode;

View File

@ -27,13 +27,14 @@ import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
public class NettyEchoBenchmark {
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
final int payloadSize = 100;
int CYCLE_SIZE = 50000;
final long NUMBER_OF_ITERATIONS = 500000;
@ -58,7 +59,7 @@ public class NettyEchoBenchmark {
});
// Bind and start to accept incoming connections.
serverBootstrap.bind(new InetSocketAddress(9000));
serverBootstrap.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 9000));
ClientBootstrap clientBootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
@ -78,7 +79,7 @@ public class NettyEchoBenchmark {
});
// Start the connection attempt.
ChannelFuture future = clientBootstrap.connect(new InetSocketAddress("localhost", 9000));
ChannelFuture future = clientBootstrap.connect(new InetSocketAddress(InetAddress.getLoopbackAddress(), 9000));
future.awaitUninterruptibly();
Channel clientChannel = future.getChannel();

View File

@ -33,7 +33,6 @@ public class UnicastBackwardsCompatibilityIT extends ESBackcompatTestCase {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("transport.tcp.port", 9380 + nodeOrdinal)
.put("discovery.zen.ping.multicast.enabled", false)
.put("discovery.zen.ping.unicast.hosts", "localhost:9380,localhost:9381,localhost:9390,localhost:9391")
.build();
}
@ -43,7 +42,6 @@ public class UnicastBackwardsCompatibilityIT extends ESBackcompatTestCase {
return Settings.settingsBuilder()
.put(super.externalNodeSettings(nodeOrdinal))
.put("transport.tcp.port", 9390 + nodeOrdinal)
.put("discovery.zen.ping.multicast.enabled", false)
.put("discovery.zen.ping.unicast.hosts", "localhost:9380,localhost:9381,localhost:9390,localhost:9391")
.build();
}

View File

@ -123,7 +123,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
}
@Override
public TransportAddress[] addressesFromString(String address) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
throw new UnsupportedOperationException();
}

View File

@ -32,6 +32,8 @@ import org.elasticsearch.transport.*;
import org.junit.Test;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -53,6 +55,11 @@ public class TransportClientNodesServiceTests extends ESTestCase {
TestIteration() {
threadPool = new ThreadPool("transport-client-nodes-service-tests");
transport = new FailAndRetryMockTransport<TestResponse>(getRandom()) {
@Override
public List<String> getLocalAddresses() {
return Collections.EMPTY_LIST;
}
@Override
protected TestResponse newResponse() {
return new TestResponse();

View File

@ -60,7 +60,7 @@ public class TransportClientRetryIT extends ESIntegTestCase {
Settings.Builder builder = settingsBuilder().put("client.transport.nodes_sampler_interval", "1s")
.put("name", "transport_client_retry_test")
.put("node.mode", InternalTestCluster.nodeMode())
.put("node.mode", internalCluster().getNodeMode())
.put(ClusterName.SETTING, internalCluster().getClusterName())
.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true)
.put("path.home", createTempDir());

View File

@ -66,6 +66,7 @@ import static org.hamcrest.Matchers.notNullValue;
*
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class ClusterServiceIT extends ESIntegTestCase {
@Test

View File

@ -44,6 +44,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class MinimumMasterNodesIT extends ESIntegTestCase {
@Test

View File

@ -57,6 +57,7 @@ import static org.hamcrest.Matchers.lessThan;
/**
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class NoMasterNodeIT extends ESIntegTestCase {
@Test

View File

@ -34,6 +34,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class SpecificMasterNodesIT extends ESIntegTestCase {
protected final Settings.Builder settingsBuilder() {

View File

@ -0,0 +1,107 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.network;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
/**
* Tests for network address formatting. Please avoid using any methods that cause DNS lookups!
*/
public class NetworkAddressTests extends ESTestCase {
public void testFormatV4() throws Exception {
assertEquals("localhost/127.0.0.1", NetworkAddress.format(forge("localhost", "127.0.0.1")));
assertEquals("127.0.0.1", NetworkAddress.format(forge(null, "127.0.0.1")));
}
public void testFormatV6() throws Exception {
assertEquals("localhost/::1", NetworkAddress.format(forge("localhost", "::1")));
assertEquals("::1", NetworkAddress.format(forge(null, "::1")));
}
public void testFormatAddressV4() throws Exception {
assertEquals("127.0.0.1", NetworkAddress.formatAddress(forge("localhost", "127.0.0.1")));
assertEquals("127.0.0.1", NetworkAddress.formatAddress(forge(null, "127.0.0.1")));
}
public void testFormatAddressV6() throws Exception {
assertEquals("::1", NetworkAddress.formatAddress(forge("localhost", "::1")));
assertEquals("::1", NetworkAddress.formatAddress(forge(null, "::1")));
}
public void testFormatPortV4() throws Exception {
assertEquals("localhost/127.0.0.1:1234", NetworkAddress.format(new InetSocketAddress(forge("localhost", "127.0.0.1"), 1234)));
assertEquals("127.0.0.1:1234", NetworkAddress.format(new InetSocketAddress(forge(null, "127.0.0.1"), 1234)));
}
public void testFormatPortV6() throws Exception {
assertEquals("localhost/[::1]:1234", NetworkAddress.format(new InetSocketAddress(forge("localhost", "::1"), 1234)));
assertEquals("[::1]:1234",NetworkAddress.format(new InetSocketAddress(forge(null, "::1"), 1234)));
}
public void testFormatAddressPortV4() throws Exception {
assertEquals("127.0.0.1:1234", NetworkAddress.formatAddress(new InetSocketAddress(forge("localhost", "127.0.0.1"), 1234)));
assertEquals("127.0.0.1:1234", NetworkAddress.formatAddress(new InetSocketAddress(forge(null, "127.0.0.1"), 1234)));
}
public void testFormatAddressPortV6() throws Exception {
assertEquals("[::1]:1234", NetworkAddress.formatAddress(new InetSocketAddress(forge("localhost", "::1"), 1234)));
assertEquals("[::1]:1234", NetworkAddress.formatAddress(new InetSocketAddress(forge(null, "::1"), 1234)));
}
public void testNoScopeID() throws Exception {
assertEquals("::1", NetworkAddress.format(forgeScoped(null, "::1", 5)));
assertEquals("localhost/::1", NetworkAddress.format(forgeScoped("localhost", "::1", 5)));
assertEquals("::1", NetworkAddress.formatAddress(forgeScoped(null, "::1", 5)));
assertEquals("::1", NetworkAddress.formatAddress(forgeScoped("localhost", "::1", 5)));
assertEquals("[::1]:1234", NetworkAddress.format(new InetSocketAddress(forgeScoped(null, "::1", 5), 1234)));
assertEquals("localhost/[::1]:1234", NetworkAddress.format(new InetSocketAddress(forgeScoped("localhost", "::1", 5), 1234)));
assertEquals("[::1]:1234", NetworkAddress.formatAddress(new InetSocketAddress(forgeScoped(null, "::1", 5), 1234)));
assertEquals("[::1]:1234", NetworkAddress.formatAddress(new InetSocketAddress(forgeScoped("localhost", "::1", 5), 1234)));
}
/** creates address without any lookups. hostname can be null, for missing */
private InetAddress forge(String hostname, String address) throws IOException {
if (hostname == null) {
return InetAddress.getByName(address);
} else {
byte bytes[] = InetAddress.getByName(address).getAddress();
return InetAddress.getByAddress(hostname, bytes);
}
}
/** creates scoped ipv6 address without any lookups. hostname can be null, for missing */
private InetAddress forgeScoped(String hostname, String address, int scopeid) throws IOException {
byte bytes[] = InetAddress.getByName(address).getAddress();
if (hostname == null) {
return Inet6Address.getByAddress(hostname, bytes);
} else {
return Inet6Address.getByAddress(hostname, bytes, scopeid);
}
}
}

View File

@ -72,6 +72,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@ESIntegTestCase.SuppressLocalMode
public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
private static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.
@ -142,12 +143,15 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("http.enabled", false) // just to make test quicker
.put("transport.host", "127.0.0.1") // only bind on one IF we use v4 here by default
.put("transport.bind_host", "127.0.0.1")
.put("transport.publish_host", "127.0.0.1")
.put("gateway.local.list_timeout", "10s") // still long to induce failures but to long so test won't time out
.put("plugin.types", MockTransportService.TestPlugin.class.getName())
.build();
private void configureCluster(int numberOfNodes, int minimumMasterNode) throws ExecutionException, InterruptedException {
if (randomBoolean()) {
if (randomBoolean() && canUseMuticast()) {
configureMulticastCluster(numberOfNodes, minimumMasterNode);
} else {
configureUnicastCluster(numberOfNodes, null, minimumMasterNode);
@ -159,10 +163,13 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
logger.info("---> configured multicast");
// TODO: Rarely use default settings form some of these
Settings settings = Settings.builder()
.put(DEFAULT_SETTINGS)
.put("discovery.zen.ping.multicast.enabled", true)
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, minimumMasterNode)
.put()
.build();
if (discoveryConfig == null) {
@ -174,6 +181,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
if (minimumMasterNode < 0) {
minimumMasterNode = numberOfNodes / 2 + 1;
}
logger.info("---> configured unicast");
// TODO: Rarely use default settings form some of these
Settings nodeSettings = Settings.builder()
.put(DEFAULT_SETTINGS)

View File

@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class ZenUnicastDiscoveryIT extends ESIntegTestCase {
private ClusterDiscoveryConfiguration discoveryConfig;

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
@ -41,15 +42,13 @@ import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -66,6 +65,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
@ESIntegTestCase.SuppressLocalMode
public class ZenDiscoveryIT extends ESIntegTestCase {
@Test
@ -226,15 +226,14 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
}
@Test
public void testHandleNodeJoin_incompatibleMinVersion() {
public void testHandleNodeJoin_incompatibleMinVersion() throws UnknownHostException {
Settings nodeSettings = Settings.settingsBuilder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.put("node.mode", "local") // <-- force local transport so we can fake a network address
.build();
String nodeName = internalCluster().startNode(nodeSettings, Version.V_2_0_0_beta1);
ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName);
DiscoveryNode node = new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.V_1_6_0);
DiscoveryNode node = new DiscoveryNode("_node_id", new InetSocketTransportAddress(InetAddress.getByName("0.0.0.0"), 0), Version.V_1_6_0);
final AtomicReference<IllegalStateException> holder = new AtomicReference<>();
zenDiscovery.handleJoinRequest(node, new MembershipAction.JoinCallback() {
@Override

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
@ -32,6 +33,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -44,12 +46,14 @@ import java.net.MulticastSocket;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.Multicast
public class MulticastZenPingIT extends ESTestCase {
private Settings buildRandomMulticast(Settings settings) {
Settings.Builder builder = Settings.builder().put(settings);
builder.put("discovery.zen.ping.multicast.group", "224.2.3." + randomIntBetween(0, 255));
builder.put("discovery.zen.ping.multicast.port", randomIntBetween(55000, 56000));
builder.put("discovery.zen.ping.multicast.enabled", true);
if (randomBoolean()) {
builder.put("discovery.zen.ping.multicast.shared", randomBoolean());
}
@ -129,7 +133,7 @@ public class MulticastZenPingIT extends ESTestCase {
}
}
@Test
@Test @SuppressForbidden(reason = "I bind to wildcard addresses. I am a total nightmare")
public void testExternalPing() throws Exception {
Settings settings = Settings.EMPTY;
settings = buildRandomMulticast(settings);

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -39,6 +40,8 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport;
import org.junit.Test;
import java.net.InetSocketAddress;
import static org.hamcrest.Matchers.equalTo;
public class UnicastZenPingIT extends ESTestCase {
@ -68,8 +71,8 @@ public class UnicastZenPingIT extends ESTestCase {
InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress();
Settings hostsSettings = Settings.settingsBuilder().putArray("discovery.zen.ping.unicast.hosts",
addressA.address().getAddress().getHostAddress() + ":" + addressA.address().getPort(),
addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort())
NetworkAddress.formatAddress(new InetSocketAddress(addressA.address().getAddress(), addressA.address().getPort())),
NetworkAddress.formatAddress(new InetSocketAddress(addressB.address().getAddress(), addressB.address().getPort())))
.build();
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, electMasterService, null);

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.http.netty.pipelining;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.test.ESTestCase;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
@ -32,6 +33,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@ -57,7 +59,7 @@ public class HttpPipeliningHandlerTest extends ESTestCase {
private static final long CONNECTION_TIMEOUT = 10000L;
private static final String CONTENT_TYPE_TEXT = "text/plain; charset=UTF-8";
// TODO make me random
private static final InetSocketAddress HOST_ADDR = new InetSocketAddress("127.0.0.1", 9080);
private static final InetSocketAddress HOST_ADDR = new InetSocketAddress(InetAddress.getLoopbackAddress(), 9080);
private static final String PATH1 = "/1";
private static final String PATH2 = "/2";
private static final String SOME_RESPONSE_TEXT = "some response for ";
@ -123,13 +125,14 @@ public class HttpPipeliningHandlerTest extends ESTestCase {
assertTrue(connectionFuture.await(CONNECTION_TIMEOUT));
final Channel clientChannel = connectionFuture.getChannel();
// NetworkAddress.formatAddress makes a proper HOST header.
final HttpRequest request1 = new DefaultHttpRequest(
HTTP_1_1, HttpMethod.GET, PATH1);
request1.headers().add(HOST, HOST_ADDR.toString());
request1.headers().add(HOST, NetworkAddress.formatAddress(HOST_ADDR));
final HttpRequest request2 = new DefaultHttpRequest(
HTTP_1_1, HttpMethod.GET, PATH2);
request2.headers().add(HOST, HOST_ADDR.toString());
request2.headers().add(HOST, NetworkAddress.formatAddress(HOST_ADDR));
clientChannel.write(request1);
clientChannel.write(request2);

View File

@ -48,6 +48,7 @@ import static org.hamcrest.Matchers.equalTo;
* Test failure when index replication actions fail mid-flight
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@ESIntegTestCase.SuppressLocalMode
public class TransportIndexFailuresIT extends ESIntegTestCase {
private static final Settings nodeSettings = Settings.settingsBuilder()

View File

@ -64,6 +64,7 @@ import static org.hamcrest.Matchers.*;
/**
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, transportClientRatio = 0)
@ESIntegTestCase.SuppressLocalMode
public class RareClusterStateIT extends ESIntegTestCase {
@Override

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.threadpool.ThreadPool;
@ -49,6 +50,7 @@ public class PluggableTransportModuleIT extends ESIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder()
.put(super.nodeSettings(nodeOrdinal))
.put(DiscoveryModule.DISCOVERY_TYPE_KEY, "local")
.put("plugin.types", CountingSentRequestsPlugin.class.getName())
.build();
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.plugins;
import com.google.common.base.Charsets;
import com.google.common.hash.Hashing;
import org.apache.http.impl.client.HttpClients;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
@ -51,8 +52,10 @@ import org.junit.Test;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import java.io.BufferedWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
@ -605,7 +608,7 @@ public class PluginManagerIT extends ESIntegTestCase {
}
});
Channel channel = serverBootstrap.bind(new InetSocketAddress("localhost", 0));
Channel channel = serverBootstrap.bind(new InetSocketAddress(InetAddress.getByName("localhost"), 0));
int port = ((InetSocketAddress) channel.getLocalAddress()).getPort();
// IO_ERROR because there is no real file delivered...
assertStatus(String.format(Locale.ROOT, "install https://user:pass@localhost:%s/foo.zip --verbose --timeout 1s", port), ExitStatus.IO_ERROR);
@ -645,7 +648,6 @@ public class PluginManagerIT extends ESIntegTestCase {
private Tuple<Settings, Environment> buildInitialSettings() throws IOException {
Settings settings = settingsBuilder()
.put("discovery.zen.ping.multicast.enabled", false)
.put("http.enabled", true)
.put("path.home", createTempDir()).build();
return InternalSettingsPreparer.prepareSettings(settings, false);

View File

@ -63,8 +63,10 @@ import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.admin.cluster.repositories.get.RestGetRepositoriesAction;
import org.elasticsearch.rest.action.admin.cluster.state.RestClusterStateAction;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.transport.TransportModule;
import org.junit.Test;
import java.io.IOException;
@ -96,7 +98,8 @@ import static org.hamcrest.Matchers.nullValue;
/**
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@ESIntegTestCase.SuppressLocalMode // TODO only restorePersistentSettingsTest needs this maybe factor out?
public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
@Test

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import java.net.InetAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -42,9 +43,9 @@ public class ClientFailover {
// TODO: what is this? a public static void main test?!?!
final TransportClient client = TransportClient.builder().build()
.addTransportAddress(new InetSocketTransportAddress("localhost", 9300))
.addTransportAddress(new InetSocketTransportAddress("localhost", 9301))
.addTransportAddress(new InetSocketTransportAddress("localhost", 9302));
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9301))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9302));
final AtomicBoolean done = new AtomicBoolean();
final AtomicLong indexed = new AtomicLong();

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import java.net.InetAddress;
import java.util.Date;
/**
@ -49,7 +50,7 @@ public class ManyIndicesRemoteStressTest {
Node node = null;
// TODO: what is this? a public static void main test?!?!?!
if (true) {
client = TransportClient.builder().settings(Settings.EMPTY).build().addTransportAddress(new InetSocketTransportAddress("localhost", 9300));
client = TransportClient.builder().settings(Settings.EMPTY).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
} else {
node = NodeBuilder.nodeBuilder().client(true).node();
client = node.client();

View File

@ -32,6 +32,7 @@ import org.apache.http.impl.client.HttpClients;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
@ -117,6 +118,7 @@ import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.client.RandomizingClient;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
import org.elasticsearch.transport.TransportModule;
import org.hamcrest.Matchers;
import org.joda.time.DateTimeZone;
import org.junit.*;
@ -124,7 +126,9 @@ import org.junit.*;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
@ -1553,49 +1557,51 @@ public abstract class ESIntegTestCase extends ESTestCase {
assertThat(clearResponse.isSucceeded(), equalTo(true));
}
private static ClusterScope getAnnotation(Class<?> clazz) {
private static <A extends Annotation> A getAnnotation(Class<?> clazz, Class<A> annotationClass) {
if (clazz == Object.class || clazz == ESIntegTestCase.class) {
return null;
}
ClusterScope annotation = clazz.getAnnotation(ClusterScope.class);
A annotation = clazz.getAnnotation(annotationClass);
if (annotation != null) {
return annotation;
}
return getAnnotation(clazz.getSuperclass());
return getAnnotation(clazz.getSuperclass(), annotationClass);
}
private Scope getCurrentClusterScope() {
return getCurrentClusterScope(this.getClass());
}
private static Scope getCurrentClusterScope(Class<?> clazz) {
ClusterScope annotation = getAnnotation(clazz);
ClusterScope annotation = getAnnotation(clazz, ClusterScope.class);
// if we are not annotated assume suite!
return annotation == null ? Scope.SUITE : annotation.scope();
}
private int getNumDataNodes() {
ClusterScope annotation = getAnnotation(this.getClass());
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null ? -1 : annotation.numDataNodes();
}
private int getMinNumDataNodes() {
ClusterScope annotation = getAnnotation(this.getClass());
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null || annotation.minNumDataNodes() == -1 ? InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES : annotation.minNumDataNodes();
}
private int getMaxNumDataNodes() {
ClusterScope annotation = getAnnotation(this.getClass());
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null || annotation.maxNumDataNodes() == -1 ? InternalTestCluster.DEFAULT_MAX_NUM_DATA_NODES : annotation.maxNumDataNodes();
}
private int getNumClientNodes() {
ClusterScope annotation = getAnnotation(this.getClass());
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null ? InternalTestCluster.DEFAULT_NUM_CLIENT_NODES : annotation.numClientNodes();
}
private boolean randomDynamicTemplates() {
ClusterScope annotation = getAnnotation(this.getClass());
ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
return annotation == null || annotation.randomDynamicTemplates();
}
@ -1607,7 +1613,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
* In other words subclasses must ensure this method is idempotent.
*/
protected Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder()
Settings.Builder builder = settingsBuilder()
// Default the watermarks to absurdly low to prevent the tests
// from failing on nodes without enough disk space
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "1b")
@ -1615,8 +1621,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
.put("script.indexed", "on")
.put("script.inline", "on")
// wait short time for other active shards before actually deleting, default 30s not needed in tests
.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(1, TimeUnit.SECONDS))
.build();
.put(IndicesStore.INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(1, TimeUnit.SECONDS));
return builder.build();
}
/**
@ -1629,7 +1635,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
return Settings.EMPTY;
}
private ExternalTestCluster buildExternalCluster(String clusterAddresses) {
private ExternalTestCluster buildExternalCluster(String clusterAddresses) throws UnknownHostException {
String[] stringAddresses = clusterAddresses.split(",");
TransportAddress[] transportAddresses = new TransportAddress[stringAddresses.length];
int i = 0;
@ -1639,7 +1645,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
throw new IllegalArgumentException("address [" + clusterAddresses + "] not valid");
}
try {
transportAddresses[i++] = new InetSocketTransportAddress(split[0], Integer.valueOf(split[1]));
transportAddresses[i++] = new InetSocketTransportAddress(InetAddress.getByName(split[0]), Integer.valueOf(split[1]));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port is not valid, expected number but was [" + split[1] + "]");
}
@ -1693,7 +1699,18 @@ public abstract class ESIntegTestCase extends ESTestCase {
minNumDataNodes = getMinNumDataNodes();
maxNumDataNodes = getMaxNumDataNodes();
}
return new InternalTestCluster(seed, createTempDir(), minNumDataNodes, maxNumDataNodes,
SuppressLocalMode noLocal = getAnnotation(this.getClass(), SuppressLocalMode.class);
SuppressNetworkMode noNetwork = getAnnotation(this.getClass(), SuppressNetworkMode.class);
String nodeMode = InternalTestCluster.configuredNodeMode();
if (noLocal != null && noNetwork != null) {
throw new IllegalStateException("Can't suppress both network and local mode");
} else if (noLocal != null){
nodeMode = "network";
} else if (noNetwork != null) {
nodeMode = "local";
}
return new InternalTestCluster(nodeMode, seed, createTempDir(), minNumDataNodes, maxNumDataNodes,
InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", settingsSource, getNumClientNodes(),
InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix);
}
@ -1715,7 +1732,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
* return a random ratio in the interval <tt>[0..1]</tt>
*/
protected double getPerTestTransportClientRatio() {
final ClusterScope annotation = getAnnotation(this.getClass());
final ClusterScope annotation = getAnnotation(this.getClass(), ClusterScope.class);
double perTestRatio = -1;
if (annotation != null) {
perTestRatio = annotation.transportClientRatio();
@ -1947,7 +1964,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
TransportAddress publishAddress = randomFrom(nodes).getHttp().address().publishAddress();
assertEquals(1, publishAddress.uniqueAddressTypeId());
InetSocketAddress address = ((InetSocketTransportAddress) publishAddress).address();
return new HttpRequestBuilder(HttpClients.createDefault()).host(address.getHostName()).port(address.getPort());
return new HttpRequestBuilder(HttpClients.createDefault()).host(NetworkAddress.formatAddress(address.getAddress())).port(address.getPort());
}
/**
@ -1973,4 +1990,39 @@ public abstract class ESIntegTestCase extends ESTestCase {
@Inherited
public @interface SuiteScopeTestCase {
}
/**
* If used the test will never run in local mode.
*/
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SuppressLocalMode {}
/**
* If used the test will never run in network mode
*/
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface SuppressNetworkMode {}
/**
* Annotation used to set if working multicast is required to run the test.
* By default, tests annotated with @Multicast won't be executed.
* Set -Dtests.multicast=true when running test to launch multicast tests
*/
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@TestGroup(enabled = false, sysProperty = "tests.multicast")
public @interface Multicast {
}
/**
* Returns true if tests can use multicast. Default is false.
* To disable an entire test use {@link org.elasticsearch.test.ESIntegTestCase.Multicast} instead
*/
protected boolean canUseMuticast() {
return Boolean.parseBoolean(System.getProperty("tests.multicast", "false"));
}
}

View File

@ -192,8 +192,6 @@ public final class InternalTestCluster extends TestCluster {
static final boolean DEFAULT_ENABLE_HTTP_PIPELINING = true;
public static final String NODE_MODE = nodeMode();
/* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */
private final NavigableMap<String, NodeAndClient> nodes = new TreeMap<>();
@ -227,16 +225,21 @@ public final class InternalTestCluster extends TestCluster {
private final Path baseDir;
private ServiceDisruptionScheme activeDisruptionScheme;
private String nodeMode;
public InternalTestCluster(long clusterSeed, Path baseDir, int minNumDataNodes, int maxNumDataNodes, String clusterName, int numClientNodes,
public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir, int minNumDataNodes, int maxNumDataNodes, String clusterName, int numClientNodes,
boolean enableHttpPipelining, String nodePrefix) {
this(clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableHttpPipelining, nodePrefix);
this(nodeMode, clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableHttpPipelining, nodePrefix);
}
public InternalTestCluster(long clusterSeed, Path baseDir,
public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir,
int minNumDataNodes, int maxNumDataNodes, String clusterName, SettingsSource settingsSource, int numClientNodes,
boolean enableHttpPipelining, String nodePrefix) {
super(clusterSeed);
if ("network".equals(nodeMode) == false && "local".equals(nodeMode) == false) {
throw new IllegalArgumentException("Unknown nodeMode: " + nodeMode);
}
this.nodeMode = nodeMode;
this.baseDir = baseDir;
this.clusterName = clusterName;
if (minNumDataNodes < 0 || maxNumDataNodes < 0) {
@ -300,7 +303,7 @@ public final class InternalTestCluster extends TestCluster {
builder.put("transport.tcp.port", BASE_PORT + "-" + (BASE_PORT + 100));
builder.put("http.port", BASE_PORT + 101 + "-" + (BASE_PORT + 200));
builder.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true);
builder.put("node.mode", NODE_MODE);
builder.put("node.mode", nodeMode);
builder.put("http.pipelining", enableHttpPipelining);
if (Strings.hasLength(System.getProperty("es.logger.level"))) {
builder.put("logger.level", System.getProperty("es.logger.level"));
@ -327,7 +330,7 @@ public final class InternalTestCluster extends TestCluster {
executor = EsExecutors.newCached("test runner", 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName));
}
public static String nodeMode() {
public static String configuredNodeMode() {
Builder builder = Settings.builder();
if (Strings.isEmpty(System.getProperty("es.node.mode")) && Strings.isEmpty(System.getProperty("es.node.local"))) {
return "local"; // default if nothing is specified
@ -354,11 +357,8 @@ public final class InternalTestCluster extends TestCluster {
return nodes.keySet().toArray(Strings.EMPTY_ARRAY);
}
private static boolean isLocalTransportConfigured() {
if ("local".equals(System.getProperty("es.node.mode", "network"))) {
return true;
}
return Boolean.parseBoolean(System.getProperty("es.node.local", "false"));
private boolean isLocalTransportConfigured() {
return "local".equals(nodeMode);
}
private Settings getSettings(int nodeOrdinal, long nodeSeed, Settings others) {
@ -378,7 +378,7 @@ public final class InternalTestCluster extends TestCluster {
return builder.build();
}
private static Settings getRandomNodeSettings(long seed) {
private Settings getRandomNodeSettings(long seed) {
Random random = new Random(seed);
Builder builder = Settings.settingsBuilder()
.put(SETTING_CLUSTER_NODE_SEED, seed);
@ -782,6 +782,10 @@ public final class InternalTestCluster extends TestCluster {
}
}
public String getNodeMode() {
return nodeMode;
}
private final class NodeAndClient implements Closeable {
private Node node;
private Client nodeClient;
@ -844,7 +848,7 @@ public final class InternalTestCluster extends TestCluster {
/* no sniff client for now - doesn't work will all tests since it might throw NoNodeAvailableException if nodes are shut down.
* we first need support of transportClientRatio as annotations or so
*/
return transportClient = new TransportClientFactory(false, settingsSource.transportClient(), baseDir).client(node, clusterName);
return transportClient = new TransportClientFactory(false, settingsSource.transportClient(), baseDir, nodeMode).client(node, clusterName);
}
void resetClient() throws IOException {
@ -901,11 +905,13 @@ public final class InternalTestCluster extends TestCluster {
private final boolean sniff;
private final Settings settings;
private final Path baseDir;
private final String nodeMode;
TransportClientFactory(boolean sniff, Settings settings, Path baseDir) {
TransportClientFactory(boolean sniff, Settings settings, Path baseDir, String nodeMode) {
this.sniff = sniff;
this.settings = settings != null ? settings : Settings.EMPTY;
this.baseDir = baseDir;
this.nodeMode = nodeMode;
}
public Client client(Node node, String clusterName) {
@ -916,7 +922,7 @@ public final class InternalTestCluster extends TestCluster {
.put("path.home", baseDir)
.put("name", TRANSPORT_CLIENT_PREFIX + node.settings().get("name"))
.put(ClusterName.SETTING, clusterName).put("client.transport.sniff", sniff)
.put("node.mode", nodeSettings.get("node.mode", NODE_MODE))
.put("node.mode", nodeSettings.get("node.mode", nodeMode))
.put("node.local", nodeSettings.get("node.local", ""))
.put("logger.prefix", nodeSettings.get("logger.prefix", ""))
.put("logger.level", nodeSettings.get("logger.level", "INFO"))

View File

@ -211,4 +211,6 @@ public abstract class TestCluster implements Iterable<Client>, Closeable {
* Returns the cluster name
*/
public abstract String getClusterName();
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.test.discovery;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.google.common.primitives.Ints;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.InternalTestCluster;
@ -28,15 +29,17 @@ import org.elasticsearch.test.SettingsSource;
import org.elasticsearch.transport.local.LocalTransport;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
public class ClusterDiscoveryConfiguration extends SettingsSource {
static Settings DEFAULT_NODE_SETTINGS = Settings.settingsBuilder().put("discovery.type", "zen").build();
private static final String IP_ADDR = "127.0.0.1";
final int numOfNodes;
final Settings nodeSettings;
@ -63,29 +66,15 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
// this variable is incremented on each bind attempt and will maintain the next port that should be tried
private static int nextPort = calcBasePort();
// since we run multiple test iterations, we need some flexibility in the choice of ports a node can have (port may
// stay in use by previous iterations on some OSes - read CentOs). This controls the amount of ports each node
// is assigned. All ports in range will be added to the unicast hosts, which is OK because we know only one will be used.
private static final int NUM_PORTS_PER_NODE = 3;
private final String[] unicastHosts;
private final boolean localMode;
public UnicastZen(int numOfNodes) {
this(numOfNodes, numOfNodes);
}
private final int[] unicastHostOrdinals;
private final int[] unicastHostPorts;
public UnicastZen(int numOfNodes, Settings extraSettings) {
this(numOfNodes, numOfNodes, extraSettings);
}
public UnicastZen(int numOfNodes, int numOfUnicastHosts) {
this(numOfNodes, numOfUnicastHosts, Settings.EMPTY);
}
public UnicastZen(int numOfNodes, int numOfUnicastHosts, Settings extraSettings) {
super(numOfNodes, extraSettings);
int[] unicastHostOrdinals;
if (numOfUnicastHosts == numOfNodes) {
unicastHostOrdinals = new int[numOfNodes];
for (int i = 0; i < numOfNodes; i++) {
@ -98,8 +87,8 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
}
unicastHostOrdinals = Ints.toArray(ordinals);
}
this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local");
this.unicastHosts = buildUnicastHostSetting(unicastHostOrdinals, localMode);
this.unicastHostPorts = unicastHostPorts(numOfNodes);
assert unicastHostOrdinals.length <= unicastHostPorts.length;
}
public UnicastZen(int numOfNodes, int[] unicastHostOrdinals) {
@ -108,59 +97,70 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
public UnicastZen(int numOfNodes, Settings extraSettings, int[] unicastHostOrdinals) {
super(numOfNodes, extraSettings);
this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local");
this.unicastHosts = buildUnicastHostSetting(unicastHostOrdinals, localMode);
this.unicastHostOrdinals = unicastHostOrdinals;
this.unicastHostPorts = unicastHostPorts(numOfNodes);
assert unicastHostOrdinals.length <= unicastHostPorts.length;
}
private static int calcBasePort() {
return 30000 + InternalTestCluster.BASE_PORT;
}
private static String[] buildUnicastHostSetting(int[] unicastHostOrdinals, boolean localMode) {
ArrayList<String> unicastHosts = new ArrayList<>();
for (int i = 0; i < unicastHostOrdinals.length; i++) {
final int hostOrdinal = unicastHostOrdinals[i];
if (localMode) {
unicastHosts.add("node_" + hostOrdinal);
} else {
// we need to pin the node port & host so we'd know where to point things
final int[] ports = nodePorts(hostOrdinal);
for (int port : ports) {
unicastHosts.add("localhost:" + port);
}
}
}
return unicastHosts.toArray(new String[unicastHosts.size()]);
}
@Override
public Settings node(int nodeOrdinal) {
Settings.Builder builder = Settings.builder()
.put("discovery.zen.ping.multicast.enabled", false);
Settings.Builder builder = Settings.builder();
if (localMode) {
builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal);
String[] unicastHosts = new String[unicastHostOrdinals.length];
if (nodeOrdinal >= unicastHostPorts.length) {
throw new ElasticsearchException("nodeOrdinal [" + nodeOrdinal + "] is greater than the number unicast ports [" + unicastHostPorts.length + "]");
} else {
// we need to pin the node port & host so we'd know where to point things
String ports = "";
for (int port : nodePorts(nodeOrdinal)) {
ports += "," + port;
builder.put("transport.tcp.port", unicastHostPorts[nodeOrdinal]);
builder.put("transport.host", IP_ADDR); // only bind on one IF we use v4 here by default
builder.put("transport.bind_host", IP_ADDR);
builder.put("transport.publish_host", IP_ADDR);
builder.put("http.enabled", false);
for (int i = 0; i < unicastHostOrdinals.length; i++) {
unicastHosts[i] = IP_ADDR + ":" + (unicastHostPorts[unicastHostOrdinals[i]]);
}
builder.put("transport.tcp.port", ports.substring(1));
builder.put("transport.host", "localhost");
}
builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts);
return builder.put(super.node(nodeOrdinal)).build();
}
protected static int[] nodePorts(int nodeOridnal) {
int[] unicastHostPorts = new int[NUM_PORTS_PER_NODE];
@SuppressForbidden(reason = "we know we pass a IP address")
protected synchronized static int[] unicastHostPorts(int numHosts) {
int[] unicastHostPorts = new int[numHosts];
final int basePort = calcBasePort() + nodeOridnal * NUM_PORTS_PER_NODE;
final int basePort = calcBasePort();
final int maxPort = basePort + InternalTestCluster.PORTS_PER_JVM;
int tries = 0;
for (int i = 0; i < unicastHostPorts.length; i++) {
unicastHostPorts[i] = basePort + i;
}
boolean foundPortInRange = false;
while (tries < InternalTestCluster.PORTS_PER_JVM && !foundPortInRange) {
try (ServerSocket serverSocket = new ServerSocket()) {
// Set SO_REUSEADDR as we may bind here and not be able to reuse the address immediately without it.
serverSocket.setReuseAddress(NetworkUtils.defaultReuseAddress());
serverSocket.bind(new InetSocketAddress(IP_ADDR, nextPort));
// bind was a success
foundPortInRange = true;
unicastHostPorts[i] = nextPort;
} catch (IOException e) {
// Do nothing
}
nextPort++;
if (nextPort >= maxPort) {
// Roll back to the beginning of the range and do not go into another JVM's port range
nextPort = basePort;
}
tries++;
}
if (!foundPortInRange) {
throw new ElasticsearchException("could not find enough open ports in range [" + basePort + "-" + maxPort + "]. required [" + unicastHostPorts.length + "] ports");
}
}
return unicastHostPorts;
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
import org.elasticsearch.test.rest.client.http.HttpResponse;
@ -247,7 +248,7 @@ public class RestClient implements Closeable {
return new HttpRequestBuilder(httpClient)
.addHeaders(headers)
.protocol(protocol)
.host(address.getHostName()).port(address.getPort());
.host(NetworkAddress.formatAddress(address.getAddress())).port(address.getPort());
}
protected HttpRequestBuilder httpRequestBuilder() {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.http.HttpServerTransport;
@ -77,7 +78,7 @@ public class HttpRequestBuilder {
public HttpRequestBuilder httpTransport(HttpServerTransport httpServerTransport) {
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().publishAddress();
return host(transportAddress.address().getHostName()).port(transportAddress.address().getPort());
return host(NetworkAddress.formatAddress(transportAddress.address().getAddress())).port(transportAddress.address().getPort());
}
public HttpRequestBuilder port(int port) {

View File

@ -55,8 +55,8 @@ public class InternalTestClusterTests extends ESTestCase {
String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10);
Path baseDir = createTempDir();
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix);
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix);
InternalTestCluster cluster0 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix);
InternalTestCluster cluster1 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix);
assertClusters(cluster0, cluster1, true);
}
@ -99,8 +99,8 @@ public class InternalTestClusterTests extends ESTestCase {
String nodePrefix = "foobar";
Path baseDir = createTempDir();
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName1, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix);
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName2, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix);
InternalTestCluster cluster0 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName1, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix);
InternalTestCluster cluster1 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName2, settingsSource, numClientNodes, enableHttpPipelining, nodePrefix);
assertClusters(cluster0, cluster1, false);
long seed = randomLong();

View File

@ -28,10 +28,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.BlockingQueue;
/** A transport class that doesn't send anything but rather captures all requests for inspection from tests */
@ -114,7 +111,8 @@ public class CapturingTransport implements Transport {
}
@Override
public TransportAddress[] addressesFromString(String address) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
// WTF
return new TransportAddress[0];
}
@ -177,4 +175,9 @@ public class CapturingTransport implements Transport {
public void close() {
}
@Override
public List<String> getLocalAddresses() {
return Collections.EMPTY_LIST;
}
}

View File

@ -362,8 +362,8 @@ public class MockTransportService extends TransportService {
}
@Override
public TransportAddress[] addressesFromString(String address) throws Exception {
return transport.addressesFromString(address);
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
return transport.addressesFromString(address, perAddressLimit);
}
@Override
@ -401,6 +401,11 @@ public class MockTransportService extends TransportService {
return transport.serverOpen();
}
@Override
public List<String> getLocalAddresses() {
return transport.getLocalAddresses();
}
@Override
public Lifecycle.State lifecycleState() {
return transport.lifecycleState();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.transport;
import com.google.common.base.Charsets;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
@ -39,6 +40,7 @@ import org.junit.Test;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.Socket;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -55,7 +57,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
private ThreadPool threadPool;
private NettyTransport nettyTransport;
private int port;
private String host;
private InetAddress host;
@Before
public void startThreadPool() {
@ -70,7 +72,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) nettyTransport.boundAddress().boundAddress();
port = transportAddress.address().getPort();
host = transportAddress.address().getHostString();
host = transportAddress.address().getAddress();
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.test.ESIntegTestCase;
@ -32,6 +33,7 @@ import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule;
import org.junit.Test;
import java.net.InetAddress;
import java.util.Locale;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -71,7 +73,7 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
.put("path.home", createTempDir().toString())
.build();
try (TransportClient transportClient = TransportClient.builder().settings(settings).loadConfigSettings(false).build()) {
transportClient.addTransportAddress(new InetSocketTransportAddress("127.0.0.1", randomPort));
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), randomPort));
ClusterHealthResponse response = transportClient.admin().cluster().prepareHealth().get();
assertThat(response.getStatus(), is(ClusterHealthStatus.GREEN));
}
@ -93,7 +95,7 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
// publish address
assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(), instanceOf(InetSocketTransportAddress.class));
InetSocketTransportAddress publishAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress();
assertThat(publishAddress.address().getHostName(), is("127.0.0.7"));
assertThat(NetworkAddress.formatAddress(publishAddress.address().getAddress()), is("127.0.0.7"));
assertThat(publishAddress.address().getPort(), is(4321));
}
}

View File

@ -170,7 +170,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
// Set SO_REUSEADDR as we may bind here and not be able
// to reuse the address immediately without it.
serverSocket.setReuseAddress(NetworkUtils.defaultReuseAddress());
serverSocket.bind(new InetSocketAddress(nextPort));
serverSocket.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), nextPort));
// bind was a success
logger.debug("port [{}] available.", nextPort);
@ -199,7 +199,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
private void assertConnectionRefused(int port) throws Exception {
try {
trySocketConnection(new InetSocketTransportAddress("localhost", port).address());
trySocketConnection(new InetSocketTransportAddress(InetAddress.getByName("localhost"), port).address());
fail("Expected to get exception when connecting to port " + port);
} catch (IOException e) {
// expected
@ -213,7 +213,7 @@ public class NettyTransportMultiPortTests extends ESTestCase {
private void assertPortIsBound(String host, int port) throws Exception {
logger.info("Trying to connect to [{}]:[{}]", host, port);
trySocketConnection(new InetSocketTransportAddress(host, port).address());
trySocketConnection(new InetSocketTransportAddress(InetAddress.getByName(host), port).address());
}
private void trySocketConnection(InetSocketAddress address) throws Exception {

View File

@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
/** Unit tests for NettyTransport */
public class NettyTransportTests extends ESTestCase {
/** Test ipv4 host with a default port works */
public void testParseV4DefaultPort() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE);
assertEquals(1, addresses.length);
assertEquals("127.0.0.1", addresses[0].getAddress());
assertEquals(1234, addresses[0].getPort());
}
/** Test ipv4 host with a default port range works */
public void testParseV4DefaultRange() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE);
assertEquals(2, addresses.length);
assertEquals("127.0.0.1", addresses[0].getAddress());
assertEquals(1234, addresses[0].getPort());
assertEquals("127.0.0.1", addresses[1].getAddress());
assertEquals(1235, addresses[1].getPort());
}
/** Test ipv4 host with port works */
public void testParseV4WithPort() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE);
assertEquals(1, addresses.length);
assertEquals("127.0.0.1", addresses[0].getAddress());
assertEquals(2345, addresses[0].getPort());
}
/** Test ipv4 host with port range works */
public void testParseV4WithPortRange() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE);
assertEquals(2, addresses.length);
assertEquals("127.0.0.1", addresses[0].getAddress());
assertEquals(2345, addresses[0].getPort());
assertEquals("127.0.0.1", addresses[1].getAddress());
assertEquals(2346, addresses[1].getPort());
}
/** Test unbracketed ipv6 hosts in configuration fail. Leave no ambiguity */
public void testParseV6UnBracketed() throws Exception {
try {
NettyTransport.parse("::1", "1234", Integer.MAX_VALUE);
fail("should have gotten exception");
} catch (IllegalArgumentException expected) {
assertTrue(expected.getMessage().contains("must be bracketed"));
}
}
/** Test ipv6 host with a default port works */
public void testParseV6DefaultPort() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234", Integer.MAX_VALUE);
assertEquals(1, addresses.length);
assertEquals("::1", addresses[0].getAddress());
assertEquals(1234, addresses[0].getPort());
}
/** Test ipv6 host with a default port range works */
public void testParseV6DefaultRange() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE);
assertEquals(2, addresses.length);
assertEquals("::1", addresses[0].getAddress());
assertEquals(1234, addresses[0].getPort());
assertEquals("::1", addresses[1].getAddress());
assertEquals(1235, addresses[1].getPort());
}
/** Test ipv6 host with port works */
public void testParseV6WithPort() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE);
assertEquals(1, addresses.length);
assertEquals("::1", addresses[0].getAddress());
assertEquals(2345, addresses[0].getPort());
}
/** Test ipv6 host with port range works */
public void testParseV6WithPortRange() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE);
assertEquals(2, addresses.length);
assertEquals("::1", addresses[0].getAddress());
assertEquals(2345, addresses[0].getPort());
assertEquals("::1", addresses[1].getAddress());
assertEquals(2346, addresses[1].getPort());
}
/** Test per-address limit */
public void testAddressLimit() throws Exception {
TransportAddress[] addresses = NettyTransport.parse("[::1]:100-200", "1000", 3);
assertEquals(3, addresses.length);
assertEquals(100, addresses[0].getPort());
assertEquals(101, addresses[1].getPort());
assertEquals(102, addresses[2].getPort());
}
}

View File

@ -31,6 +31,9 @@ import org.elasticsearch.transport.AbstractSimpleTransportTests;
import org.elasticsearch.transport.ConnectTransportException;
import org.junit.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
@Override
@ -44,7 +47,7 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
}
@Test(expected = ConnectTransportException.class)
public void testConnectException() {
serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress("localhost", 9876), Version.CURRENT));
public void testConnectException() throws UnknownHostException {
serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9876), Version.CURRENT));
}
}

View File

@ -70,7 +70,7 @@ public class TribeIT extends ESIntegTestCase {
public static void setupSecondCluster() throws Exception {
ESIntegTestCase.beforeClass();
// create another cluster
cluster2 = new InternalTestCluster(randomLong(), createTempDir(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, SECOND_CLUSTER_NODE_PREFIX);
cluster2 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), randomLong(), createTempDir(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, SECOND_CLUSTER_NODE_PREFIX);
cluster2.beforeTest(getRandom(), 0.1);
cluster2.ensureAtLeastNumDataNodes(2);
}

View File

@ -48,7 +48,7 @@ public class TribeUnitTests extends ESTestCase {
private static Node tribe1;
private static Node tribe2;
private static final String NODE_MODE = InternalTestCluster.nodeMode();
private static final String NODE_MODE = InternalTestCluster.configuredNodeMode();
@BeforeClass
public static void createTribes() {

View File

@ -136,8 +136,7 @@
<attribute name="home" default="${integ.scratch}/elasticsearch-${elasticsearch.version}"/>
<attribute name="spawn" default="true"/>
<attribute name="args" default="${integ.args}"/>
<attribute name="es.unicast.enabled" default="false"/>
<attribute name="es.unicast.hosts" default=""/>
<attribute name="es.unicast.hosts" default="localhost:${integ.transport.port}"/>
<attribute name="es.cluster.name" default="${integ.cluster.name}"/>
<attribute name="es.http.port" default="${integ.http.port}"/>
<attribute name="es.transport.tcp.port" default="${integ.transport.port}"/>
@ -166,7 +165,6 @@
<arg value="-Des.http.port=@{es.http.port}"/>
<arg value="-Des.transport.tcp.port=@{es.transport.tcp.port}"/>
<arg value="-Des.pidfile=@{es.pidfile}"/>
<arg value="-Des.discovery.zen.ping.unicast.enabled=@{es.unicast.enabled}"/>
<arg value="-Des.discovery.zen.ping.unicast.hosts=@{es.unicast.hosts}"/>
<arg value="-Des.path.repo=@{home}/repo"/>
<arg value="-Des.path.shared_data=@{home}/../"/>
@ -243,7 +241,7 @@
<attribute name="es.pidfile" default="${integ.pidfile}"/>
<attribute name="es.peer.list" />
<sequential>
<startup-elasticsearch es.pidfile="@{es.pidfile}" es.unicast.enabled="true"
<startup-elasticsearch es.pidfile="@{es.pidfile}"
es.transport.tcp.port="@{es.transport.port}" es.http.port="@{es.http.port}"
es.unicast.hosts="@{es.peer.list}"/>
</sequential>

View File

@ -59,3 +59,30 @@ java.nio.file.Files#isHidden(java.nio.file.Path) @ Dependent on the operating sy
java.nio.file.Files#getFileStore(java.nio.file.Path) @ Use Environment.getFileStore() instead, impacted by JDK-8034057
java.nio.file.Files#isWritable(java.nio.file.Path) @ Use Environment.isWritable() instead, impacted by JDK-8034057
@defaultMessage Resolve hosts explicitly to the address(es) you want with InetAddress.
java.net.InetSocketAddress#<init>(java.lang.String,int)
java.net.Socket#<init>(java.lang.String,int)
java.net.Socket#<init>(java.lang.String,int,java.net.InetAddress,int)
@defaultMessage Don't bind to wildcard addresses. Be specific.
java.net.DatagramSocket#<init>()
java.net.DatagramSocket#<init>(int)
java.net.InetSocketAddress#<init>(int)
java.net.MulticastSocket#<init>()
java.net.MulticastSocket#<init>(int)
java.net.ServerSocket#<init>(int)
java.net.ServerSocket#<init>(int,int)
@defaultMessage use NetworkAddress format/formatAddress to print IP or IP+ports
java.net.InetAddress#toString()
java.net.InetAddress#getHostAddress()
java.net.Inet4Address#getHostAddress()
java.net.Inet6Address#getHostAddress()
java.net.InetSocketAddress#toString()
@defaultMessage avoid DNS lookups by accident: if you have a valid reason, then @SuppressWarnings with that reason so its completely clear
java.net.InetAddress#getHostName()
java.net.InetAddress#getCanonicalHostName()
java.net.InetSocketAddress#getHostName() @ Use getHostString() instead, which avoids a DNS lookup

View File

@ -71,13 +71,10 @@
#
# --------------------------------- Discovery ----------------------------------
#
# Elasticsearch nodes will find each other via multicast, by default.
#
# To use the unicast discovery, disable the multicast discovery:
#
# discovery.zen.ping.multicast.enabled: false
# Elasticsearch nodes will find each other via unicast, by default.
#
# Pass an initial list of hosts to perform discovery when new node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
# discovery.zen.ping.unicast.hosts: ["host1", "host2"]
#
@ -85,6 +82,10 @@
#
# discovery.zen.minimum_master_nodes: 3
#
# To use multicast for discovery, enable it:
#
# discovery.zen.ping.multicast.enabled: true
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html>
#

View File

@ -203,7 +203,7 @@ We can see that our cluster named "elasticsearch" is up with a green status.
Whenever we ask for the cluster health, we either get green, yellow, or red. Green means everything is good (cluster is fully functional), yellow means all data is available but some replicas are not yet allocated (cluster is fully functional), and red means some data is not available for whatever reason. Note that even if a cluster is red, it still is partially functional (i.e. it will continue to serve search requests from the available shards) but you will likely need to fix it ASAP since you have missing data.
Also from the above response, we can see and total of 1 node and that we have 0 shards since we have no data in it yet. Note that since we are using the default cluster name (elasticsearch) and since Elasticsearch uses multicast network discovery by default to find other nodes, it is possible that you could accidentally start up more than one node in your network and have them all join a single cluster. In this scenario, you may see more than 1 node in the above response.
Also from the above response, we can see and total of 1 node and that we have 0 shards since we have no data in it yet. Note that since we are using the default cluster name (elasticsearch) and since Elasticsearch uses unicast network discovery by default to find other nodes on the same machine, it is possible that you could accidentally start up more than one node on your computer and have them all join a single cluster. In this scenario, you may see more than 1 node in the above response.
We can also get a list of nodes in our cluster as follows:

View File

@ -2,7 +2,7 @@
=== Zen Discovery
The zen discovery is the built in discovery module for elasticsearch and
the default. It provides both multicast and unicast discovery as well
the default. It provides both unicast and multicast discovery as well
being easily extended to support cloud environments.
The zen discovery is integrated with other modules, for example, all
@ -40,7 +40,7 @@ respond to. It provides the following settings with the
|`address` |The address to bind to, defaults to `null` which means it
will bind `network.bind_host`
|`enabled` |Whether multicast ping discovery is enabled. Defaults to `true`.
|`enabled` |Whether multicast ping discovery is enabled. Defaults to `false`.
|=======================================================================
[float]
@ -57,7 +57,8 @@ as gossip routers. It provides the following settings with the
|Setting |Description
|`hosts` |Either an array setting or a comma delimited setting. Each
value is either in the form of `host:port`, or in the form of
`host[port1-port2]`.
`host:port1-port2`. Note that IPv6 hosts must be bracketed. Defaults to
`127.0.0.1, [::1]`
|=======================================================================
The unicast discovery uses the

View File

@ -156,9 +156,9 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
}
if (address != null) {
try {
TransportAddress[] addresses = transportService.addressesFromString(address);
// we only limit to 1 addresses, makes no sense to ping 100 ports
for (int i = 0; (i < addresses.length && i < UnicastZenPing.LIMIT_PORTS_COUNT); i++) {
// we only limit to 1 port per address, makes no sense to ping 100 ports
TransportAddress[] addresses = transportService.addressesFromString(address, 1);
for (int i = 0; i < addresses.length; i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
discoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceId() + "-" + i, addresses[i], version.minimumCompatibilityVersion()));
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery.azure;
import com.microsoft.windowsazure.management.compute.models.*;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.azure.AzureServiceDisableException;
import org.elasticsearch.cloud.azure.AzureServiceRemoteException;
@ -28,6 +29,7 @@ import org.elasticsearch.cloud.azure.management.AzureComputeService.Discovery;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@ -37,6 +39,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Locale;
import java.util.List;
@ -216,9 +219,9 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
if (privateIp != null) {
if (privateIp.equals(ipAddress)) {
logger.trace("adding ourselves {}", ipAddress);
logger.trace("adding ourselves {}", NetworkAddress.format(ipAddress));
}
networkAddress = privateIp.getHostAddress();
networkAddress = NetworkAddress.formatAddress(privateIp);
} else {
logger.trace("no private ip provided. ignoring [{}]...", instance.getInstanceName());
}
@ -231,7 +234,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
continue;
}
networkAddress = endpoint.getVirtualIPAddress().getHostAddress() + ":" + endpoint.getPort();
networkAddress = NetworkAddress.formatAddress(new InetSocketAddress(endpoint.getVirtualIPAddress(), endpoint.getPort()));
}
if (networkAddress == null) {
@ -251,11 +254,13 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
}
try {
TransportAddress[] addresses = transportService.addressesFromString(networkAddress);
// we only limit to 1 addresses, makes no sense to ping 100 ports
logger.trace("adding {}, transport_address {}", networkAddress, addresses[0]);
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), addresses[0],
// we only limit to 1 port per address, makes no sense to ping 100 ports
TransportAddress[] addresses = transportService.addressesFromString(networkAddress, 1);
for (TransportAddress address : addresses) {
logger.trace("adding {}, transport_address {}", networkAddress, address);
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), address,
version.minimumCompatibilityVersion()));
}
} catch (Exception e) {
logger.warn("can not convert [{}] to transport address. skipping. [{}]", networkAddress, e.getMessage());
}

View File

@ -22,12 +22,14 @@ package org.elasticsearch.discovery.gce;
import com.google.api.services.compute.model.AccessConfig;
import com.google.api.services.compute.model.Instance;
import com.google.api.services.compute.model.NetworkInterface;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.gce.GceComputeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@ -114,7 +116,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
try {
InetAddress inetAddress = networkService.resolvePublishHostAddress(null);
if (inetAddress != null) {
ipAddress = inetAddress.getHostAddress();
ipAddress = NetworkAddress.formatAddress(inetAddress);
}
} catch (IOException e) {
// We can't find the publish host address... Hmmm. Too bad :-(
@ -224,13 +226,15 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
}
// ip_private is a single IP Address. We need to build a TransportAddress from it
TransportAddress[] addresses = transportService.addressesFromString(address);
// If user has set `es_port` metadata, we don't need to ping all ports
// we only limit to 1 addresses, makes no sense to ping 100 ports
logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type,
ip_private, addresses[0], status);
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + name + "-" + 0, addresses[0], version.minimumCompatibilityVersion()));
TransportAddress[] addresses = transportService.addressesFromString(address, 1);
for (TransportAddress transportAddress : addresses) {
logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type,
ip_private, transportAddress, status);
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + name + "-" + 0, transportAddress, version.minimumCompatibilityVersion()));
}
}
} catch (Exception e) {
logger.warn("failed to add {}, address {}", e, name, ip_private);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.example;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ExternalTestCluster;
import org.elasticsearch.test.TestCluster;
@ -46,7 +47,7 @@ public class SiteContentsIT extends ESIntegTestCase {
for (InetSocketAddress address : externalCluster.httpAddresses()) {
RestResponse restResponse = new RestResponse(
new HttpRequestBuilder(httpClient)
.host(address.getHostName()).port(address.getPort())
.host(NetworkAddress.formatAddress(address.getAddress())).port(address.getPort())
.path("/_plugin/site-example/")
.method("GET").execute());
assertEquals(200, restResponse.getStatusCode());

View File

@ -103,6 +103,7 @@
<tests.rest.spec></tests.rest.spec>
<tests.rest.load_packaged></tests.rest.load_packaged>
<tests.network></tests.network>
<tests.multicast></tests.multicast>
<tests.cluster></tests.cluster>
<tests.filter></tests.filter>
<env.ES_TEST_LOCAL></env.ES_TEST_LOCAL>