Reject port ranges in `discovery.seed_hosts` (#41905)
Today Elasticsearch accepts, but silently ignores, port ranges in the `discovery.seed_hosts` setting: ``` discovery.seed_hosts: 10.1.2.3:9300-9400 ``` Silently ignoring part of a setting like this is trappy. With this change we reject seed host addresses of this form. Closes #40786 Backport of #41404
This commit is contained in:
parent
e04953a2bf
commit
4c909e93bb
|
@ -17,3 +17,15 @@ coming[7.2.0]
|
|||
//tag::notable-breaking-changes[]
|
||||
|
||||
// end::notable-breaking-changes[]
|
||||
|
||||
[[breaking_72_discovery_changes]]
|
||||
=== Discovery changes
|
||||
|
||||
[float]
|
||||
==== Only a single port may be given for each seed host.
|
||||
|
||||
In earlier versions you could include a range of ports in entries in the
|
||||
`discovery.seed_hosts` list, but {es} used only the first port in the range and
|
||||
unexpectedly ignored the rest. For instance if you set `discovery.seed_hosts:
|
||||
"10.11.12.13:9300-9310"` then {es} would only use `10.11.12.13:9300` for
|
||||
discovery. Seed host addresses containing port ranges are now rejected.
|
||||
|
|
|
@ -208,8 +208,7 @@ public class AzureSeedHostsProvider implements SeedHostsProvider {
|
|||
}
|
||||
|
||||
try {
|
||||
// we only limit to 1 port per address, makes no sense to ping 100 ports
|
||||
TransportAddress[] addresses = transportService.addressesFromString(networkAddress, 1);
|
||||
TransportAddress[] addresses = transportService.addressesFromString(networkAddress);
|
||||
for (TransportAddress address : addresses) {
|
||||
logger.trace("adding {}, transport_address {}", networkAddress, address);
|
||||
dynamicHosts.add(address);
|
||||
|
|
|
@ -174,8 +174,7 @@ class AwsEc2SeedHostsProvider implements SeedHostsProvider {
|
|||
}
|
||||
if (address != null) {
|
||||
try {
|
||||
// we only limit to 1 port per address, makes no sense to ping 100 ports
|
||||
final TransportAddress[] addresses = transportService.addressesFromString(address, 1);
|
||||
final TransportAddress[] addresses = transportService.addressesFromString(address);
|
||||
for (int i = 0; i < addresses.length; i++) {
|
||||
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
|
||||
dynamicHosts.add(addresses[i]);
|
||||
|
|
|
@ -77,7 +77,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
|||
new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry,
|
||||
new NoneCircuitBreakerService()) {
|
||||
@Override
|
||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
||||
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
|
||||
// we just need to ensure we don't resolve DNS here
|
||||
return new TransportAddress[] {poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress())};
|
||||
}
|
||||
|
|
|
@ -233,8 +233,7 @@ public class GceSeedHostsProvider implements SeedHostsProvider {
|
|||
|
||||
// ip_private is a single IP Address. We need to build a TransportAddress from it
|
||||
// If user has set `es_port` metadata, we don't need to ping all ports
|
||||
// we only limit to 1 addresses, makes no sense to ping 100 ports
|
||||
TransportAddress[] addresses = transportService.addressesFromString(address, 1);
|
||||
TransportAddress[] addresses = transportService.addressesFromString(address);
|
||||
|
||||
for (TransportAddress transportAddress : addresses) {
|
||||
logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type,
|
||||
|
|
|
@ -75,7 +75,7 @@ public class FileBasedSeedHostsProvider implements SeedHostsProvider {
|
|||
|
||||
@Override
|
||||
public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
|
||||
final List<TransportAddress> transportAddresses = hostsResolver.resolveHosts(getHostsList(), 1);
|
||||
final List<TransportAddress> transportAddresses = hostsResolver.resolveHosts(getHostsList());
|
||||
logger.debug("seed addresses: {}", transportAddresses);
|
||||
return transportAddresses;
|
||||
}
|
||||
|
|
|
@ -36,10 +36,9 @@ public interface SeedHostsProvider {
|
|||
|
||||
/**
|
||||
* Helper object that allows to resolve a list of hosts to a list of transport addresses.
|
||||
* Each host is resolved into a transport address (or a collection of addresses if the
|
||||
* number of ports is greater than one)
|
||||
* Each host is resolved into a transport address
|
||||
*/
|
||||
interface HostsResolver {
|
||||
List<TransportAddress> resolveHosts(List<String> hosts, int limitPortCounts);
|
||||
List<TransportAddress> resolveHosts(List<String> hosts);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,7 +116,6 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
|
|||
* @param executorService the executor service used to parallelize hostname lookups
|
||||
* @param logger logger used for logging messages regarding hostname lookups
|
||||
* @param hosts the hosts to resolve
|
||||
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
|
||||
* @param transportService the transport service
|
||||
* @param resolveTimeout the timeout before returning from hostname lookups
|
||||
* @return a list of resolved transport addresses
|
||||
|
@ -125,7 +124,6 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
|
|||
final ExecutorService executorService,
|
||||
final Logger logger,
|
||||
final List<String> hosts,
|
||||
final int limitPortCounts,
|
||||
final TransportService transportService,
|
||||
final TimeValue resolveTimeout) {
|
||||
Objects.requireNonNull(executorService);
|
||||
|
@ -140,7 +138,7 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
|
|||
final List<Callable<TransportAddress[]>> callables =
|
||||
hosts
|
||||
.stream()
|
||||
.map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
|
||||
.map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn))
|
||||
.collect(Collectors.toList());
|
||||
final List<Future<TransportAddress[]>> futures;
|
||||
try {
|
||||
|
@ -224,9 +222,8 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
|
|||
}
|
||||
|
||||
List<TransportAddress> providedAddresses
|
||||
= hostsProvider.getSeedAddresses((hosts, limitPortCounts)
|
||||
-> resolveHostsLists(executorService.get(), logger, hosts, limitPortCounts,
|
||||
transportService, resolveTimeout));
|
||||
= hostsProvider.getSeedAddresses(hosts ->
|
||||
resolveHostsLists(executorService.get(), logger, hosts, transportService, resolveTimeout));
|
||||
|
||||
consumer.accept(providedAddresses);
|
||||
}
|
||||
|
|
|
@ -50,12 +50,7 @@ public class SettingsBasedSeedHostsProvider implements SeedHostsProvider {
|
|||
public static final Setting<List<String>> DISCOVERY_SEED_HOSTS_SETTING =
|
||||
Setting.listSetting("discovery.seed_hosts", emptyList(), Function.identity(), Property.NodeScope);
|
||||
|
||||
// these limits are per-address
|
||||
private static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
|
||||
private static final int LIMIT_LOCAL_PORTS_COUNT = 5;
|
||||
|
||||
private final List<String> configuredHosts;
|
||||
private final int limitPortCounts;
|
||||
|
||||
public SettingsBasedSeedHostsProvider(Settings settings, TransportService transportService) {
|
||||
if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
|
||||
|
@ -66,15 +61,11 @@ public class SettingsBasedSeedHostsProvider implements SeedHostsProvider {
|
|||
}
|
||||
configuredHosts = LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
|
||||
// we only limit to 1 address, makes no sense to ping 100 ports
|
||||
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
|
||||
} else if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) {
|
||||
configuredHosts = DISCOVERY_SEED_HOSTS_SETTING.get(settings);
|
||||
// we only limit to 1 address, makes no sense to ping 100 ports
|
||||
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
|
||||
} else {
|
||||
// if unicast hosts are not specified, fill with simple defaults on the local machine
|
||||
configuredHosts = transportService.getLocalAddresses();
|
||||
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
|
||||
configuredHosts = transportService.getDefaultSeedAddresses();
|
||||
}
|
||||
|
||||
logger.debug("using initial hosts {}", configuredHosts);
|
||||
|
@ -82,6 +73,6 @@ public class SettingsBasedSeedHostsProvider implements SeedHostsProvider {
|
|||
|
||||
@Override
|
||||
public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) {
|
||||
return hostsResolver.resolveHosts(configuredHosts, limitPortCounts);
|
||||
return hostsResolver.resolveHosts(configuredHosts);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -144,8 +144,7 @@ public class UnicastZenPing implements ZenPing {
|
|||
}
|
||||
|
||||
private SeedHostsProvider.HostsResolver createHostsResolver() {
|
||||
return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts,
|
||||
limitPortCounts, transportService, resolveTimeout);
|
||||
return hosts -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts, transportService, resolveTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -86,6 +86,7 @@ import java.util.concurrent.locks.ReadWriteLock;
|
|||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.elasticsearch.common.transport.NetworkExceptionHelper.isCloseConnectionException;
|
||||
|
@ -102,6 +103,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
|
||||
private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]);
|
||||
|
||||
// this limit is per-address
|
||||
private static final int LIMIT_LOCAL_PORTS_COUNT = 6;
|
||||
|
||||
protected final Settings settings;
|
||||
protected final ThreadPool threadPool;
|
||||
protected final PageCacheRecycler pageCacheRecycler;
|
||||
|
@ -311,14 +315,20 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<String> getLocalAddresses() {
|
||||
public List<String> getDefaultSeedAddresses() {
|
||||
List<String> local = new ArrayList<>();
|
||||
local.add("127.0.0.1");
|
||||
// check if v6 is supported, if so, v4 will also work via mapped addresses.
|
||||
if (NetworkUtils.SUPPORTS_V6) {
|
||||
local.add("[::1]"); // may get ports appended!
|
||||
}
|
||||
return local;
|
||||
return local.stream()
|
||||
.flatMap(
|
||||
address -> Arrays.stream(defaultPortRange())
|
||||
.limit(LIMIT_LOCAL_PORTS_COUNT)
|
||||
.mapToObj(port -> address + ":" + port)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
protected void bindServer(ProfileSettings profileSettings) {
|
||||
|
@ -456,8 +466,17 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
||||
return parse(address, settings.get("transport.profiles.default.port", TransportSettings.PORT.get(settings)), perAddressLimit);
|
||||
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
|
||||
return parse(address, defaultPortRange()[0]);
|
||||
}
|
||||
|
||||
private int[] defaultPortRange() {
|
||||
return new PortsRange(
|
||||
settings.get(
|
||||
TransportSettings.PORT_PROFILE.getConcreteSettingForNamespace(TransportSettings.DEFAULT_PROFILE).getKey(),
|
||||
TransportSettings.PORT.get(settings)
|
||||
)
|
||||
).ports();
|
||||
}
|
||||
|
||||
// this code is a take on guava's HostAndPort, like a HostAndPortRange
|
||||
|
@ -467,9 +486,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
private static final Pattern BRACKET_PATTERN = Pattern.compile("^\\[(.*:.*)\\](?::([\\d\\-]*))?$");
|
||||
|
||||
/**
|
||||
* parse a hostname+port range spec into its equivalent addresses
|
||||
* parse a hostname+port spec into its equivalent addresses
|
||||
*/
|
||||
static TransportAddress[] parse(String hostPortString, String defaultPortRange, int perAddressLimit) throws UnknownHostException {
|
||||
static TransportAddress[] parse(String hostPortString, int defaultPort) throws UnknownHostException {
|
||||
Objects.requireNonNull(hostPortString);
|
||||
String host;
|
||||
String portString = null;
|
||||
|
@ -498,22 +517,18 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
|
|||
}
|
||||
}
|
||||
|
||||
int port;
|
||||
// if port isn't specified, fill with the default
|
||||
if (portString == null || portString.isEmpty()) {
|
||||
portString = defaultPortRange;
|
||||
port = defaultPort;
|
||||
} else {
|
||||
port = Integer.parseInt(portString);
|
||||
}
|
||||
|
||||
// generate address for each port in the range
|
||||
Set<InetAddress> addresses = new HashSet<>(Arrays.asList(InetAddress.getAllByName(host)));
|
||||
List<TransportAddress> transportAddresses = new ArrayList<>();
|
||||
int[] ports = new PortsRange(portString).ports();
|
||||
int limit = Math.min(ports.length, perAddressLimit);
|
||||
for (int i = 0; i < limit; i++) {
|
||||
for (InetAddress address : addresses) {
|
||||
transportAddresses.add(new TransportAddress(address, ports[i]));
|
||||
}
|
||||
}
|
||||
return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]);
|
||||
return Arrays.stream(InetAddress.getAllByName(host))
|
||||
.distinct()
|
||||
.map(address -> new TransportAddress(address, port))
|
||||
.toArray(TransportAddress[]::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -68,12 +68,12 @@ public interface Transport extends LifecycleComponent {
|
|||
/**
|
||||
* Returns an address from its string representation.
|
||||
*/
|
||||
TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException;
|
||||
TransportAddress[] addressesFromString(String address) throws UnknownHostException;
|
||||
|
||||
/**
|
||||
* Returns a list of all local adresses for this transport
|
||||
* Returns a list of all local addresses for this transport
|
||||
*/
|
||||
List<String> getLocalAddresses();
|
||||
List<String> getDefaultSeedAddresses();
|
||||
|
||||
default CircuitBreaker getInFlightRequestBreaker() {
|
||||
return new NoopCircuitBreaker("in-flight-noop");
|
||||
|
|
|
@ -313,8 +313,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|||
return transport.boundAddress();
|
||||
}
|
||||
|
||||
public List<String> getLocalAddresses() {
|
||||
return transport.getLocalAddresses();
|
||||
public List<String> getDefaultSeedAddresses() {
|
||||
return transport.getDefaultSeedAddresses();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -750,8 +750,8 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
|
|||
return true;
|
||||
}
|
||||
|
||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
||||
return transport.addressesFromString(address, perAddressLimit);
|
||||
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
|
||||
return transport.addressesFromString(address);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -170,7 +170,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
|
|||
}
|
||||
|
||||
@Override
|
||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
||||
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
|
|
@ -128,7 +128,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
|
|||
threadPool = new TestThreadPool("transport-client-nodes-service-tests");
|
||||
transport = new FailAndRetryMockTransport<TestResponse>(random(), clusterName) {
|
||||
@Override
|
||||
public List<String> getLocalAddresses() {
|
||||
public List<String> getDefaultSeedAddresses() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
|
|
@ -401,7 +401,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) {
|
||||
public TransportAddress[] addressesFromString(String address) {
|
||||
return new TransportAddress[0];
|
||||
}
|
||||
|
||||
|
@ -440,7 +440,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<String> getLocalAddresses() {
|
||||
public List<String> getDefaultSeedAddresses() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -115,9 +115,8 @@ public class FileBasedSeedHostsProviderTests extends ESTestCase {
|
|||
|
||||
public void testUnicastHostsDoesNotExist() {
|
||||
final FileBasedSeedHostsProvider provider = new FileBasedSeedHostsProvider(createTempDir().toAbsolutePath());
|
||||
final List<TransportAddress> addresses = provider.getSeedAddresses((hosts, limitPortCounts) ->
|
||||
SeedHostsResolver.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
|
||||
TimeValue.timeValueSeconds(10)));
|
||||
final List<TransportAddress> addresses = provider.getSeedAddresses(hosts ->
|
||||
SeedHostsResolver.resolveHostsLists(executorService, logger, hosts, transportService, TimeValue.timeValueSeconds(10)));
|
||||
assertEquals(0, addresses.size());
|
||||
}
|
||||
|
||||
|
@ -145,8 +144,7 @@ public class FileBasedSeedHostsProviderTests extends ESTestCase {
|
|||
writer.write(String.join("\n", hostEntries));
|
||||
}
|
||||
|
||||
return new FileBasedSeedHostsProvider(configPath).getSeedAddresses((hosts, limitPortCounts) ->
|
||||
SeedHostsResolver.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
|
||||
TimeValue.timeValueSeconds(10)));
|
||||
return new FileBasedSeedHostsProvider(configPath).getSeedAddresses(hosts ->
|
||||
SeedHostsResolver.resolveHostsLists(executorService, logger, hosts, transportService, TimeValue.timeValueSeconds(10)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -147,47 +147,6 @@ public class SeedHostsResolverTests extends ESTestCase {
|
|||
assertThat(resolvedAddressesRef.get(), equalTo(transportAddresses));
|
||||
}
|
||||
|
||||
public void testPortLimit() {
|
||||
final NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||
final Transport transport = new MockNioTransport(
|
||||
Settings.EMPTY,
|
||||
Version.CURRENT,
|
||||
threadPool,
|
||||
networkService,
|
||||
PageCacheRecycler.NON_RECYCLING_INSTANCE,
|
||||
new NamedWriteableRegistry(Collections.emptyList()),
|
||||
new NoneCircuitBreakerService()) {
|
||||
|
||||
@Override
|
||||
public BoundTransportAddress boundAddress() {
|
||||
return new BoundTransportAddress(
|
||||
new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9500)},
|
||||
new TransportAddress(InetAddress.getLoopbackAddress(), 9500)
|
||||
);
|
||||
}
|
||||
};
|
||||
closeables.push(transport);
|
||||
final TransportService transportService =
|
||||
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
|
||||
Collections.emptySet());
|
||||
closeables.push(transportService);
|
||||
final int limitPortCounts = randomIntBetween(1, 10);
|
||||
final List<TransportAddress> transportAddresses = SeedHostsResolver.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Collections.singletonList("127.0.0.1"),
|
||||
limitPortCounts,
|
||||
transportService,
|
||||
TimeValue.timeValueSeconds(30));
|
||||
assertThat(transportAddresses, hasSize(limitPortCounts));
|
||||
final Set<Integer> ports = new HashSet<>();
|
||||
for (final TransportAddress address : transportAddresses) {
|
||||
assertTrue(address.address().getAddress().isLoopbackAddress());
|
||||
ports.add(address.getPort());
|
||||
}
|
||||
assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).mapToObj(m -> m).collect(Collectors.toSet())));
|
||||
}
|
||||
|
||||
public void testRemovingLocalAddresses() {
|
||||
final NetworkService networkService = new NetworkService(Collections.emptyList());
|
||||
final InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
|
||||
|
@ -219,8 +178,9 @@ public class SeedHostsResolverTests extends ESTestCase {
|
|||
final List<TransportAddress> transportAddresses = SeedHostsResolver.resolveHostsLists(
|
||||
executorService,
|
||||
logger,
|
||||
Collections.singletonList(NetworkAddress.format(loopbackAddress)),
|
||||
10,
|
||||
IntStream.range(9300, 9310)
|
||||
.mapToObj(port -> NetworkAddress.format(loopbackAddress) + ":" + port)
|
||||
.collect(Collectors.toList()),
|
||||
transportService,
|
||||
TimeValue.timeValueSeconds(30));
|
||||
assertThat(transportAddresses, hasSize(7));
|
||||
|
@ -255,7 +215,7 @@ public class SeedHostsResolverTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
||||
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
|
||||
throw unknownHostException;
|
||||
}
|
||||
|
||||
|
@ -271,7 +231,6 @@ public class SeedHostsResolverTests extends ESTestCase {
|
|||
executorService,
|
||||
logger,
|
||||
Arrays.asList(hostname),
|
||||
1,
|
||||
transportService,
|
||||
TimeValue.timeValueSeconds(30)
|
||||
);
|
||||
|
@ -302,7 +261,7 @@ public class SeedHostsResolverTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
||||
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
|
||||
if ("hostname1".equals(address)) {
|
||||
return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)};
|
||||
} else if ("hostname2".equals(address)) {
|
||||
|
@ -330,7 +289,6 @@ public class SeedHostsResolverTests extends ESTestCase {
|
|||
executorService,
|
||||
logger,
|
||||
Arrays.asList("hostname1", "hostname2"),
|
||||
1,
|
||||
transportService,
|
||||
resolveTimeout);
|
||||
|
||||
|
@ -373,7 +331,6 @@ public class SeedHostsResolverTests extends ESTestCase {
|
|||
executorService,
|
||||
logger,
|
||||
Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"),
|
||||
1,
|
||||
transportService,
|
||||
TimeValue.timeValueSeconds(30));
|
||||
assertThat(transportAddresses, hasSize(1)); // only one of the two is valid and will be used
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
|
||||
import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -38,18 +37,15 @@ public class SettingsBasedSeedHostsProviderTests extends ESTestCase {
|
|||
|
||||
private class AssertingHostsResolver implements HostsResolver {
|
||||
private final Set<String> expectedHosts;
|
||||
private final int expectedPortCount;
|
||||
|
||||
private boolean resolvedHosts;
|
||||
|
||||
AssertingHostsResolver(int expectedPortCount, String... expectedHosts) {
|
||||
this.expectedPortCount = expectedPortCount;
|
||||
AssertingHostsResolver(String... expectedHosts) {
|
||||
this.expectedHosts = Sets.newHashSet(expectedHosts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TransportAddress> resolveHosts(List<String> hosts, int limitPortCounts) {
|
||||
assertEquals(expectedPortCount, limitPortCounts);
|
||||
public List<TransportAddress> resolveHosts(List<String> hosts) {
|
||||
assertEquals(expectedHosts, Sets.newHashSet(hosts));
|
||||
resolvedHosts = true;
|
||||
return emptyList();
|
||||
|
@ -61,15 +57,19 @@ public class SettingsBasedSeedHostsProviderTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testScansPortsByDefault() {
|
||||
final AssertingHostsResolver hostsResolver = new AssertingHostsResolver(5, "::1", "127.0.0.1");
|
||||
final AssertingHostsResolver hostsResolver = new AssertingHostsResolver(
|
||||
"[::1]:9300", "[::1]:9301", "127.0.0.1:9300", "127.0.0.1:9301"
|
||||
);
|
||||
final TransportService transportService = mock(TransportService.class);
|
||||
when(transportService.getLocalAddresses()).thenReturn(Arrays.asList("::1", "127.0.0.1"));
|
||||
when(transportService.getDefaultSeedAddresses()).thenReturn(
|
||||
Arrays.asList("[::1]:9300", "[::1]:9301", "127.0.0.1:9300", "127.0.0.1:9301")
|
||||
);
|
||||
new SettingsBasedSeedHostsProvider(Settings.EMPTY, transportService).getSeedAddresses(hostsResolver);
|
||||
assertTrue(hostsResolver.getResolvedHosts());
|
||||
}
|
||||
|
||||
public void testGetsHostsFromSetting() {
|
||||
final AssertingHostsResolver hostsResolver = new AssertingHostsResolver(1, "bar", "foo");
|
||||
final AssertingHostsResolver hostsResolver = new AssertingHostsResolver("bar", "foo");
|
||||
new SettingsBasedSeedHostsProvider(Settings.builder()
|
||||
.putList(SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING.getKey(), "foo", "bar")
|
||||
.build(), null).getSeedAddresses(hostsResolver);
|
||||
|
@ -77,7 +77,7 @@ public class SettingsBasedSeedHostsProviderTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testGetsHostsFromLegacySetting() {
|
||||
final AssertingHostsResolver hostsResolver = new AssertingHostsResolver(1, "bar", "foo");
|
||||
final AssertingHostsResolver hostsResolver = new AssertingHostsResolver("bar", "foo");
|
||||
new SettingsBasedSeedHostsProvider(Settings.builder()
|
||||
.putList(SettingsBasedSeedHostsProvider.LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey(), "foo", "bar")
|
||||
.build(), null).getSeedAddresses(hostsResolver);
|
||||
|
|
|
@ -19,14 +19,25 @@
|
|||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.MockPageCacheRecycler;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StreamCorruptedException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.core.IsInstanceOf.instanceOf;
|
||||
|
||||
/** Unit tests for {@link TcpTransport} */
|
||||
|
@ -34,50 +45,26 @@ public class TcpTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv4 host with a default port works */
|
||||
public void testParseV4DefaultPort() throws Exception {
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", 1234);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
assertEquals(1234, addresses[0].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv4 host with a default port range works */
|
||||
public void testParseV4DefaultRange() throws Exception {
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1", "1234-1235", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
assertEquals(1234, addresses[0].getPort());
|
||||
|
||||
assertEquals("127.0.0.1", addresses[1].getAddress());
|
||||
assertEquals(1235, addresses[1].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv4 host with port works */
|
||||
public void testParseV4WithPort() throws Exception {
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345", 1234);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
assertEquals(2345, addresses[0].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv4 host with port range works */
|
||||
public void testParseV4WithPortRange() throws Exception {
|
||||
TransportAddress[] addresses = TcpTransport.parse("127.0.0.1:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("127.0.0.1", addresses[0].getAddress());
|
||||
assertEquals(2345, addresses[0].getPort());
|
||||
|
||||
assertEquals("127.0.0.1", addresses[1].getAddress());
|
||||
assertEquals(2346, addresses[1].getPort());
|
||||
}
|
||||
|
||||
/** Test unbracketed ipv6 hosts in configuration fail. Leave no ambiguity */
|
||||
public void testParseV6UnBracketed() throws Exception {
|
||||
try {
|
||||
TcpTransport.parse("::1", "1234", Integer.MAX_VALUE);
|
||||
TcpTransport.parse("::1", 1234);
|
||||
fail("should have gotten exception");
|
||||
} catch (IllegalArgumentException expected) {
|
||||
assertTrue(expected.getMessage().contains("must be bracketed"));
|
||||
|
@ -86,53 +73,107 @@ public class TcpTransportTests extends ESTestCase {
|
|||
|
||||
/** Test ipv6 host with a default port works */
|
||||
public void testParseV6DefaultPort() throws Exception {
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]", 1234);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
assertEquals(1234, addresses[0].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv6 host with a default port range works */
|
||||
public void testParseV6DefaultRange() throws Exception {
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]", "1234-1235", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
assertEquals(1234, addresses[0].getPort());
|
||||
|
||||
assertEquals("::1", addresses[1].getAddress());
|
||||
assertEquals(1235, addresses[1].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv6 host with port works */
|
||||
public void testParseV6WithPort() throws Exception {
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345", "1234", Integer.MAX_VALUE);
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345", 1234);
|
||||
assertEquals(1, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
assertEquals(2345, addresses[0].getPort());
|
||||
}
|
||||
|
||||
/** Test ipv6 host with port range works */
|
||||
public void testParseV6WithPortRange() throws Exception {
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:2345-2346", "1234", Integer.MAX_VALUE);
|
||||
assertEquals(2, addresses.length);
|
||||
|
||||
assertEquals("::1", addresses[0].getAddress());
|
||||
assertEquals(2345, addresses[0].getPort());
|
||||
|
||||
assertEquals("::1", addresses[1].getAddress());
|
||||
assertEquals(2346, addresses[1].getPort());
|
||||
public void testRejectsPortRanges() {
|
||||
expectThrows(
|
||||
NumberFormatException.class,
|
||||
() -> TcpTransport.parse("[::1]:100-200", 1000)
|
||||
);
|
||||
}
|
||||
|
||||
/** Test per-address limit */
|
||||
public void testAddressLimit() throws Exception {
|
||||
TransportAddress[] addresses = TcpTransport.parse("[::1]:100-200", "1000", 3);
|
||||
assertEquals(3, addresses.length);
|
||||
assertEquals(100, addresses[0].getPort());
|
||||
assertEquals(101, addresses[1].getPort());
|
||||
assertEquals(102, addresses[2].getPort());
|
||||
public void testDefaultSeedAddressesWithDefaultPort() {
|
||||
testDefaultSeedAddresses(Settings.EMPTY, containsInAnyOrder(
|
||||
"[::1]:9300", "[::1]:9301", "[::1]:9302", "[::1]:9303", "[::1]:9304", "[::1]:9305",
|
||||
"127.0.0.1:9300", "127.0.0.1:9301", "127.0.0.1:9302", "127.0.0.1:9303", "127.0.0.1:9304", "127.0.0.1:9305"));
|
||||
}
|
||||
|
||||
public void testDefaultSeedAddressesWithNonstandardGlobalPortRange() {
|
||||
testDefaultSeedAddresses(Settings.builder().put(TransportSettings.PORT.getKey(), "9500-9600").build(), containsInAnyOrder(
|
||||
"[::1]:9500", "[::1]:9501", "[::1]:9502", "[::1]:9503", "[::1]:9504", "[::1]:9505",
|
||||
"127.0.0.1:9500", "127.0.0.1:9501", "127.0.0.1:9502", "127.0.0.1:9503", "127.0.0.1:9504", "127.0.0.1:9505"));
|
||||
}
|
||||
|
||||
public void testDefaultSeedAddressesWithSmallGlobalPortRange() {
|
||||
testDefaultSeedAddresses(Settings.builder().put(TransportSettings.PORT.getKey(), "9300-9302").build(), containsInAnyOrder(
|
||||
"[::1]:9300", "[::1]:9301", "[::1]:9302",
|
||||
"127.0.0.1:9300", "127.0.0.1:9301", "127.0.0.1:9302"));
|
||||
}
|
||||
|
||||
public void testDefaultSeedAddressesWithNonstandardProfilePortRange() {
|
||||
testDefaultSeedAddresses(Settings.builder()
|
||||
.put(TransportSettings.PORT_PROFILE.getConcreteSettingForNamespace(TransportSettings.DEFAULT_PROFILE).getKey(), "9500-9600")
|
||||
.build(),
|
||||
containsInAnyOrder(
|
||||
"[::1]:9500", "[::1]:9501", "[::1]:9502", "[::1]:9503", "[::1]:9504", "[::1]:9505",
|
||||
"127.0.0.1:9500", "127.0.0.1:9501", "127.0.0.1:9502", "127.0.0.1:9503", "127.0.0.1:9504", "127.0.0.1:9505"));
|
||||
}
|
||||
|
||||
public void testDefaultSeedAddressesWithSmallProfilePortRange() {
|
||||
testDefaultSeedAddresses(Settings.builder()
|
||||
.put(TransportSettings.PORT_PROFILE.getConcreteSettingForNamespace(TransportSettings.DEFAULT_PROFILE).getKey(), "9300-9302")
|
||||
.build(),
|
||||
containsInAnyOrder(
|
||||
"[::1]:9300", "[::1]:9301", "[::1]:9302",
|
||||
"127.0.0.1:9300", "127.0.0.1:9301", "127.0.0.1:9302"));
|
||||
}
|
||||
|
||||
public void testDefaultSeedAddressesPrefersProfileSettingToGlobalSetting() {
|
||||
testDefaultSeedAddresses(Settings.builder()
|
||||
.put(TransportSettings.PORT_PROFILE.getConcreteSettingForNamespace(TransportSettings.DEFAULT_PROFILE).getKey(), "9300-9302")
|
||||
.put(TransportSettings.PORT.getKey(), "9500-9600")
|
||||
.build(),
|
||||
containsInAnyOrder(
|
||||
"[::1]:9300", "[::1]:9301", "[::1]:9302",
|
||||
"127.0.0.1:9300", "127.0.0.1:9301", "127.0.0.1:9302"));
|
||||
}
|
||||
|
||||
public void testDefaultSeedAddressesWithNonstandardSinglePort() {
|
||||
testDefaultSeedAddresses(Settings.builder().put(TransportSettings.PORT.getKey(), "9500").build(),
|
||||
containsInAnyOrder("[::1]:9500", "127.0.0.1:9500"));
|
||||
}
|
||||
|
||||
private void testDefaultSeedAddresses(final Settings settings, Matcher<Iterable<? extends String>> seedAddressesMatcher) {
|
||||
final TestThreadPool testThreadPool = new TestThreadPool("test");
|
||||
try {
|
||||
final TcpTransport tcpTransport = new TcpTransport(settings, Version.CURRENT, testThreadPool,
|
||||
new MockPageCacheRecycler(settings),
|
||||
new NoneCircuitBreakerService(), writableRegistry(), new NetworkService(Collections.emptyList())) {
|
||||
|
||||
@Override
|
||||
protected TcpServerChannel bind(String name, InetSocketAddress address) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TcpChannel initiateChannel(DiscoveryNode node) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopInternal() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
|
||||
assertThat(tcpTransport.getDefaultSeedAddresses(), seedAddressesMatcher);
|
||||
} finally {
|
||||
testThreadPool.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testDecodeWithIncompleteHeader() throws IOException {
|
||||
|
|
|
@ -208,7 +208,7 @@ public class MockTransport implements Transport, LifecycleComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) {
|
||||
public TransportAddress[] addressesFromString(String address) {
|
||||
return new TransportAddress[0];
|
||||
}
|
||||
|
||||
|
@ -238,7 +238,7 @@ public class MockTransport implements Transport, LifecycleComponent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<String> getLocalAddresses() {
|
||||
public List<String> getDefaultSeedAddresses() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
|
|
|
@ -118,13 +118,13 @@ public final class StubbableTransport implements Transport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
|
||||
return delegate.addressesFromString(address, perAddressLimit);
|
||||
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
|
||||
return delegate.addressesFromString(address);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getLocalAddresses() {
|
||||
return delegate.getLocalAddresses();
|
||||
public List<String> getDefaultSeedAddresses() {
|
||||
return delegate.getDefaultSeedAddresses();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue