Allow multiple unicast host providers (#31509)

Introduces support for multiple host providers, which allows the settings based hosts resolver to be
treated just as any other UnicastHostsProvider. Also introduces the notion of a HostsResolver so
that plugins such as FileBasedDiscovery do not need to create their own thread pool for resolving
hosts, making it easier to add new similar kind of plugins.
This commit is contained in:
Yannick Welsch 2018-06-22 15:31:23 +02:00 committed by GitHub
parent 8ae2049889
commit f22f91c57a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 224 additions and 178 deletions

View File

@ -132,7 +132,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
* Setting `cloud.azure.refresh_interval` to `0` will disable caching (default).
*/
@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
if (refreshInterval.millis() != 0) {
if (dynamicHosts != null &&
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {

View File

@ -92,7 +92,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
}
@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
return dynamicHosts.getOrRefresh();
}

View File

@ -93,7 +93,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service);
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts(null);
logger.debug("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
} catch (IOException e) {
@ -307,7 +307,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicHosts();
provider.buildDynamicHosts(null);
}
assertThat(provider.fetchCount, is(3));
}
@ -324,12 +324,12 @@ public class Ec2DiscoveryTests extends ESTestCase {
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicHosts();
provider.buildDynamicHosts(null);
}
assertThat(provider.fetchCount, is(1));
Thread.sleep(1_000L); // wait for cache to expire
for (int i=0; i<3; i++) {
provider.buildDynamicHosts();
provider.buildDynamicHosts(null);
}
assertThat(provider.fetchCount, is(2));
}

View File

@ -19,35 +19,17 @@
package org.elasticsearch.discovery.file;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
@ -57,47 +39,19 @@ import java.util.function.Supplier;
*/
public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
private static final Logger logger = Loggers.getLogger(FileBasedDiscoveryPlugin.class);
private final Settings settings;
private final Path configPath;
private ExecutorService fileBasedDiscoveryExecutorService;
public FileBasedDiscoveryPlugin(Settings settings, Path configPath) {
this.settings = settings;
this.configPath = configPath;
}
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]");
fileBasedDiscoveryExecutorService = EsExecutors.newScaling(
Node.NODE_NAME_SETTING.get(settings) + "/" + "file_based_discovery_resolve",
0,
concurrentConnects,
60,
TimeUnit.SECONDS,
threadFactory,
threadPool.getThreadContext());
return Collections.emptyList();
}
@Override
public void close() throws IOException {
ThreadPool.terminate(fileBasedDiscoveryExecutorService, 0, TimeUnit.SECONDS);
}
@Override
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
return Collections.singletonMap(
"file",
() -> new FileBasedUnicastHostsProvider(
new Environment(settings, configPath), transportService, fileBasedDiscoveryExecutorService));
() -> new FileBasedUnicastHostsProvider(new Environment(settings, configPath)));
}
}

View File

@ -23,26 +23,19 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.env.Environment;
import org.elasticsearch.transport.TransportService;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT;
import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists;
/**
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
* from {@link #UNICAST_HOSTS_FILE}.
@ -59,23 +52,15 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";
private final TransportService transportService;
private final ExecutorService executorService;
private final Path unicastHostsFilePath;
private final TimeValue resolveTimeout;
FileBasedUnicastHostsProvider(Environment environment, TransportService transportService, ExecutorService executorService) {
FileBasedUnicastHostsProvider(Environment environment) {
super(environment.settings());
this.transportService = transportService;
this.executorService = executorService;
this.unicastHostsFilePath = environment.configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE);
this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
}
@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
List<String> hostsList;
try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
@ -90,21 +75,8 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
hostsList = Collections.emptyList();
}
final List<TransportAddress> dynamicHosts = new ArrayList<>();
try {
dynamicHosts.addAll(resolveHostsLists(
executorService,
logger,
hostsList,
1,
transportService,
resolveTimeout));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
final List<TransportAddress> dynamicHosts = hostsResolver.resolveHosts(hostsList, 1);
logger.debug("[discovery-file] Using dynamic discovery nodes {}", dynamicHosts);
return dynamicHosts;
}

View File

@ -24,7 +24,9 @@ import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@ -123,8 +125,10 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
final Environment environment = TestEnvironment.newEnvironment(settings);
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment, transportService, executorService);
final List<TransportAddress> addresses = provider.buildDynamicHosts();
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment);
final List<TransportAddress> addresses = provider.buildDynamicHosts((hosts, limitPortCounts) ->
UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
TimeValue.timeValueSeconds(10)));
assertEquals(0, addresses.size());
}
@ -163,6 +167,8 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
}
return new FileBasedUnicastHostsProvider(
new Environment(settings, configPath), transportService, executorService).buildDynamicHosts();
new Environment(settings, configPath)).buildDynamicHosts((hosts, limitPortCounts) ->
UnicastZenPing.resolveHostsLists(executorService, logger, hosts, limitPortCounts, transportService,
TimeValue.timeValueSeconds(10)));
}
}

View File

@ -93,7 +93,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
* Information can be cached using `cloud.gce.refresh_interval` property if needed.
*/
@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
// We check that needed properties have been set
if (this.project == null || this.project.isEmpty() || this.zones == null || this.zones.isEmpty()) {
throw new IllegalArgumentException("one or more gce discovery settings are missing. " +

View File

@ -108,7 +108,7 @@ public class GceDiscoveryTests extends ESTestCase {
GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService,
transportService, new NetworkService(Collections.emptyList()));
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts(null);
logger.info("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
}

View File

@ -56,6 +56,7 @@ import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.env.Environment;
@ -357,7 +358,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
ZenDiscovery.MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING,
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
ZenDiscovery.MAX_PENDING_CLUSTER_STATES_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,
SearchService.DEFAULT_KEEPALIVE_SETTING,

View File

@ -31,7 +31,9 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.single.SingleNodeDiscovery;
import org.elasticsearch.discovery.zen.SettingsBasedHostsProvider;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.plugins.DiscoveryPlugin;
@ -42,13 +44,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* A module for loading classes for node discovery.
@ -57,8 +61,8 @@ public class DiscoveryModule {
public static final Setting<String> DISCOVERY_TYPE_SETTING =
new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope);
public static final Setting<Optional<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
new Setting<>("discovery.zen.hosts_provider", (String)null, Optional::ofNullable, Property.NodeScope);
public static final Setting<List<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
Setting.listSetting("discovery.zen.hosts_provider", Collections.emptyList(), Function.identity(), Property.NodeScope);
private final Discovery discovery;
@ -66,9 +70,9 @@ public class DiscoveryModule {
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
AllocationService allocationService) {
final UnicastHostsProvider hostsProvider;
final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
final Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
hostProviders.put("settings", () -> new SettingsBasedHostsProvider(settings, transportService));
for (DiscoveryPlugin plugin : plugins) {
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
@ -80,17 +84,32 @@ public class DiscoveryModule {
joinValidators.add(joinValidator);
}
}
Optional<String> hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
if (hostsProviderName.isPresent()) {
Supplier<UnicastHostsProvider> hostsProviderSupplier = hostProviders.get(hostsProviderName.get());
if (hostsProviderSupplier == null) {
throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName.get() + "]");
}
hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get());
} else {
hostsProvider = Collections::emptyList;
List<String> hostsProviderNames = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
// for bwc purposes, add settings provider even if not explicitly specified
if (hostsProviderNames.contains("settings") == false) {
List<String> extendedHostsProviderNames = new ArrayList<>();
extendedHostsProviderNames.add("settings");
extendedHostsProviderNames.addAll(hostsProviderNames);
hostsProviderNames = extendedHostsProviderNames;
}
final Set<String> missingProviderNames = new HashSet<>(hostsProviderNames);
missingProviderNames.removeAll(hostProviders.keySet());
if (missingProviderNames.isEmpty() == false) {
throw new IllegalArgumentException("Unknown zen hosts providers " + missingProviderNames);
}
List<UnicastHostsProvider> filteredHostsProviders = hostsProviderNames.stream()
.map(hostProviders::get).map(Supplier::get).collect(Collectors.toList());
final UnicastHostsProvider hostsProvider = hostsResolver -> {
final List<TransportAddress> addresses = new ArrayList<>();
for (UnicastHostsProvider provider : filteredHostsProviders) {
addresses.addAll(provider.buildDynamicHosts(hostsResolver));
}
return Collections.unmodifiableList(addresses);
};
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.function.Function;
import static java.util.Collections.emptyList;
/**
* An implementation of {@link UnicastHostsProvider} that reads hosts/ports
* from the "discovery.zen.ping.unicast.hosts" node setting. If the port is
* left off an entry, a default port of 9300 is assumed.
*
* An example unicast hosts setting might look as follows:
* [67.81.244.10, 67.81.244.11:9305, 67.81.244.15:9400]
*/
public class SettingsBasedHostsProvider extends AbstractComponent implements UnicastHostsProvider {
public static final Setting<List<String>> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING =
Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(), Setting.Property.NodeScope);
// these limits are per-address
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
public static final int LIMIT_LOCAL_PORTS_COUNT = 5;
private final List<String> configuredHosts;
private final int limitPortCounts;
public SettingsBasedHostsProvider(Settings settings, TransportService transportService) {
super(settings);
if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
// we only limit to 1 address, makes no sense to ping 100 ports
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
} else {
// if unicast hosts are not specified, fill with simple defaults on the local machine
configuredHosts = transportService.getLocalAddresses();
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
}
logger.debug("using initial hosts {}", configuredHosts);
}
@Override
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
return hostsResolver.resolveHosts(configuredHosts, limitPortCounts);
}
}

View File

@ -31,5 +31,15 @@ public interface UnicastHostsProvider {
/**
* Builds the dynamic list of unicast hosts to be used for unicast discovery.
*/
List<TransportAddress> buildDynamicHosts();
List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver);
/**
* Helper object that allows to resolve a list of hosts to a list of transport addresses.
* Each host is resolved into a transport address (or a collection of addresses if the
* number of ports is greater than one)
*/
interface HostsResolver {
List<TransportAddress> resolveHosts(List<String> hosts, int limitPortCounts);
}
}

View File

@ -82,11 +82,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
@ -94,26 +92,15 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
public class UnicastZenPing extends AbstractComponent implements ZenPing {
public static final String ACTION_NAME = "internal:discovery/zen/unicast";
public static final Setting<List<String>> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING =
Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(),
Property.NodeScope);
public static final Setting<Integer> DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING =
Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope);
public static final Setting<TimeValue> DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT =
Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5), Property.NodeScope);
// these limits are per-address
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
public static final int LIMIT_LOCAL_PORTS_COUNT = 5;
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
private final List<String> configuredHosts;
private final int limitPortCounts;
private final PingContextProvider contextProvider;
private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();
@ -141,19 +128,10 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
this.contextProvider = contextProvider;
final int concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
// we only limit to 1 addresses, makes no sense to ping 100 ports
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
} else {
// if unicast hosts are not specified, fill with simple defaults on the local machine
configuredHosts = transportService.getLocalAddresses();
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
}
resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
logger.debug(
"using initial hosts {}, with concurrent_connects [{}], resolve_timeout [{}]",
configuredHosts,
"using concurrent_connects [{}], resolve_timeout [{}]",
concurrentConnects,
resolveTimeout);
@ -172,9 +150,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
/**
* Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses
* if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done
* in parallel using specified executor service up to the specified resolve timeout.
* Resolves a list of hosts to a list of transport addresses. Each host is resolved into a transport address (or a collection of
* addresses if the number of ports is greater than one). Host lookups are done in parallel using specified executor service up
* to the specified resolve timeout.
*
* @param executorService the executor service used to parallelize hostname lookups
* @param logger logger used for logging messages regarding hostname lookups
@ -190,7 +168,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
final List<String> hosts,
final int limitPortCounts,
final TransportService transportService,
final TimeValue resolveTimeout) throws InterruptedException {
final TimeValue resolveTimeout) {
Objects.requireNonNull(executorService);
Objects.requireNonNull(logger);
Objects.requireNonNull(hosts);
@ -205,8 +183,13 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
.stream()
.map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
.collect(Collectors.toList());
final List<Future<TransportAddress[]>> futures =
executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
final List<Future<TransportAddress[]>> futures;
try {
futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Collections.emptyList();
}
final List<TransportAddress> transportAddresses = new ArrayList<>();
final Set<TransportAddress> localAddresses = new HashSet<>();
localAddresses.add(transportService.boundAddress().publishAddress());
@ -232,6 +215,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
assert e.getCause() != null;
final String message = "failed to resolve host [" + hostname + "]";
logger.warn(message, e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
}
} else {
logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
@ -240,6 +226,11 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
return Collections.unmodifiableList(transportAddresses);
}
private UnicastHostsProvider.HostsResolver createHostsResolver() {
return (hosts, limitPortCounts) -> resolveHostsLists(unicastZenPingExecutorService, logger, hosts,
limitPortCounts, transportService, resolveTimeout);
}
@Override
public void close() {
ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS);
@ -281,18 +272,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
final TimeValue scheduleDuration,
final TimeValue requestDuration) {
final List<TransportAddress> seedAddresses = new ArrayList<>();
try {
seedAddresses.addAll(resolveHostsLists(
unicastZenPingExecutorService,
logger,
configuredHosts,
limitPortCounts,
transportService,
resolveTimeout));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
seedAddresses.addAll(hostsProvider.buildDynamicHosts());
seedAddresses.addAll(hostsProvider.buildDynamicHosts(createHostsResolver()));
final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {

View File

@ -137,11 +137,10 @@ public class DiscoveryModuleTests extends ESTestCase {
public void testHostsProvider() {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "custom").build();
final UnicastHostsProvider provider = Collections::emptyList;
AtomicBoolean created = new AtomicBoolean(false);
DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom", () -> {
created.set(true);
return Collections::emptyList;
return hostsResolver -> Collections.emptyList();
});
newModule(settings, Collections.singletonList(plugin));
assertTrue(created.get());
@ -151,7 +150,7 @@ public class DiscoveryModuleTests extends ESTestCase {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "dne").build();
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
newModule(settings, Collections.emptyList()));
assertEquals("Unknown zen hosts provider [dne]", e.getMessage());
assertEquals("Unknown zen hosts providers [dne]", e.getMessage());
}
public void testDuplicateHostsProvider() {
@ -162,6 +161,37 @@ public class DiscoveryModuleTests extends ESTestCase {
assertEquals("Cannot register zen hosts provider [dup] twice", e.getMessage());
}
public void testSettingsHostsProvider() {
DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("settings", () -> null);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
newModule(Settings.EMPTY, Arrays.asList(plugin)));
assertEquals("Cannot register zen hosts provider [settings] twice", e.getMessage());
}
public void testMultiHostsProvider() {
AtomicBoolean created1 = new AtomicBoolean(false);
DummyHostsProviderPlugin plugin1 = () -> Collections.singletonMap("provider1", () -> {
created1.set(true);
return hostsResolver -> Collections.emptyList();
});
AtomicBoolean created2 = new AtomicBoolean(false);
DummyHostsProviderPlugin plugin2 = () -> Collections.singletonMap("provider2", () -> {
created2.set(true);
return hostsResolver -> Collections.emptyList();
});
AtomicBoolean created3 = new AtomicBoolean(false);
DummyHostsProviderPlugin plugin3 = () -> Collections.singletonMap("provider3", () -> {
created3.set(true);
return hostsResolver -> Collections.emptyList();
});
Settings settings = Settings.builder().putList(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(),
"provider1", "provider3").build();
newModule(settings, Arrays.asList(plugin1, plugin2, plugin3));
assertTrue(created1.get());
assertFalse(created2.get());
assertTrue(created3.get());
}
public void testLazyConstructionHostsProvider() {
DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom",
() -> {

View File

@ -84,7 +84,7 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase {
internalCluster().getInstance(TransportService.class);
// try to ping the single node directly
final UnicastHostsProvider provider =
() -> Collections.singletonList(nodeTransport.getLocalNode().getAddress());
hostsResolver -> Collections.singletonList(nodeTransport.getLocalNode().getAddress());
final CountDownLatch latch = new CountDownLatch(1);
final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(nodeTransport.getLocalNode())

View File

@ -137,8 +137,6 @@ public class UnicastZenPingTests extends ESTestCase {
}
}
private static final UnicastHostsProvider EMPTY_HOSTS_PROVIDER = Collections::emptyList;
public void testSimplePings() throws IOException, InterruptedException, ExecutionException {
// use ephemeral ports
final Settings settings = Settings.builder().put("cluster.name", "test").put(TcpTransport.PORT.getKey(), 0).build();
@ -182,7 +180,7 @@ public class UnicastZenPingTests extends ESTestCase {
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build();
final ClusterState stateMismatch = ClusterState.builder(new ClusterName("mismatch")).version(randomNonNegativeLong()).build();
Settings hostsSettings = Settings.builder()
final Settings hostsSettings = Settings.builder()
.putList("discovery.zen.ping.unicast.hosts",
NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())),
NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())),
@ -196,22 +194,21 @@ public class UnicastZenPingTests extends ESTestCase {
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A"))
.build();
TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA);
TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA);
zenPingA.start();
closeables.push(zenPingA);
ClusterState stateB = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B"))
.build();
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB);
zenPingB.start();
closeables.push(zenPingB);
ClusterState stateC = ClusterState.builder(stateMismatch)
.nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C"))
.build();
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleC,
EMPTY_HOSTS_PROVIDER, () -> stateC) {
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleC, () -> stateC) {
@Override
protected Version getVersion() {
return versionD;
@ -223,8 +220,7 @@ public class UnicastZenPingTests extends ESTestCase {
ClusterState stateD = ClusterState.builder(stateMismatch)
.nodes(DiscoveryNodes.builder().add(handleD.node).localNodeId("UZP_D"))
.build();
TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD,
EMPTY_HOSTS_PROVIDER, () -> stateD);
TestUnicastZenPing zenPingD = new TestUnicastZenPing(hostsSettingsMismatch, threadPool, handleD, () -> stateD);
zenPingD.start();
closeables.push(zenPingD);
@ -329,21 +325,21 @@ public class UnicastZenPingTests extends ESTestCase {
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A"))
.build();
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA);
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA);
zenPingA.start();
closeables.push(zenPingA);
ClusterState stateB = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B"))
.build();
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB);
zenPingB.start();
closeables.push(zenPingB);
ClusterState stateC = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C"))
.build();
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, EMPTY_HOSTS_PROVIDER, () -> stateC);
TestUnicastZenPing zenPingC = new TestUnicastZenPing(hostsSettings, threadPool, handleC, () -> stateC);
zenPingC.start();
closeables.push(zenPingC);
@ -408,7 +404,7 @@ public class UnicastZenPingTests extends ESTestCase {
Collections.emptySet());
closeables.push(transportService);
final int limitPortCounts = randomIntBetween(1, 10);
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
final List<TransportAddress> transportAddresses = UnicastZenPing.resolveHostsLists(
executorService,
logger,
Collections.singletonList("127.0.0.1"),
@ -452,7 +448,7 @@ public class UnicastZenPingTests extends ESTestCase {
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
Collections.emptySet());
closeables.push(transportService);
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
final List<TransportAddress> transportAddresses = UnicastZenPing.resolveHostsLists(
executorService,
logger,
Collections.singletonList(NetworkAddress.format(loopbackAddress)),
@ -503,7 +499,7 @@ public class UnicastZenPingTests extends ESTestCase {
Collections.emptySet());
closeables.push(transportService);
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
final List<TransportAddress> transportAddresses = UnicastZenPing.resolveHostsLists(
executorService,
logger,
Arrays.asList(hostname),
@ -562,7 +558,7 @@ public class UnicastZenPingTests extends ESTestCase {
closeables.push(transportService);
final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3));
try {
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
final List<TransportAddress> transportAddresses = UnicastZenPing.resolveHostsLists(
executorService,
logger,
Arrays.asList("hostname1", "hostname2"),
@ -610,6 +606,7 @@ public class UnicastZenPingTests extends ESTestCase {
hostsSettingsBuilder.put("discovery.zen.ping.unicast.hosts", (String) null);
}
final Settings hostsSettings = hostsSettingsBuilder.build();
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build();
// connection to reuse
@ -627,14 +624,14 @@ public class UnicastZenPingTests extends ESTestCase {
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A"))
.build();
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA);
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA);
zenPingA.start();
closeables.push(zenPingA);
final ClusterState stateB = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B"))
.build();
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB);
zenPingB.start();
closeables.push(zenPingB);
@ -669,19 +666,20 @@ public class UnicastZenPingTests extends ESTestCase {
.put("cluster.name", "test")
.put("discovery.zen.ping.unicast.hosts", (String) null) // use nodes for simplicity
.build();
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong()).build();
final ClusterState stateA = ClusterState.builder(state)
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.nodes(DiscoveryNodes.builder().add(handleA.node).add(handleB.node).localNodeId("UZP_A")).build();
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, EMPTY_HOSTS_PROVIDER, () -> stateA);
final TestUnicastZenPing zenPingA = new TestUnicastZenPing(hostsSettings, threadPool, handleA, () -> stateA);
zenPingA.start();
closeables.push(zenPingA);
// Node B doesn't know about A!
final ClusterState stateB = ClusterState.builder(state).nodes(
DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B")).build();
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, EMPTY_HOSTS_PROVIDER, () -> stateB);
TestUnicastZenPing zenPingB = new TestUnicastZenPing(hostsSettings, threadPool, handleB, () -> stateB);
zenPingB.start();
closeables.push(zenPingB);
@ -728,7 +726,7 @@ public class UnicastZenPingTests extends ESTestCase {
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
Collections.emptySet());
closeables.push(transportService);
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
final List<TransportAddress> transportAddresses = UnicastZenPing.resolveHostsLists(
executorService,
logger,
Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"),
@ -828,9 +826,10 @@ public class UnicastZenPingTests extends ESTestCase {
private static class TestUnicastZenPing extends UnicastZenPing {
TestUnicastZenPing(Settings settings, ThreadPool threadPool, NetworkHandle networkHandle,
UnicastHostsProvider unicastHostsProvider, PingContextProvider contextProvider) {
PingContextProvider contextProvider) {
super(Settings.builder().put("node.name", networkHandle.node.getName()).put(settings).build(),
threadPool, networkHandle.transportService, unicastHostsProvider, contextProvider);
threadPool, networkHandle.transportService,
new SettingsBasedHostsProvider(settings, networkHandle.transportService), contextProvider);
}
volatile CountDownLatch allTasksCompleted;

View File

@ -317,7 +317,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
}
};
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService(),
masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(), ESAllocationTestCase.createAllocationService(),
Collections.emptyList());
zenDiscovery.start();
return zenDiscovery;

View File

@ -56,7 +56,7 @@ public final class MockUncasedHostProvider implements UnicastHostsProvider, Clos
}
@Override
public List<TransportAddress> buildDynamicHosts() {
public List<TransportAddress> buildDynamicHosts(HostsResolver hostsResolver) {
final DiscoveryNode localNode = getNode();
assert localNode != null;
synchronized (activeNodesPerCluster) {

View File

@ -45,7 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
/**
* A alternative zen discovery which allows using mocks for things like pings, as well as
@ -84,7 +84,7 @@ public class TestZenDiscovery extends ZenDiscovery {
final Supplier<UnicastHostsProvider> supplier;
if (USE_MOCK_PINGS.get(settings)) {
// we have to return something in order for the unicast host provider setting to resolve to something. It will never be used
supplier = () -> () -> {
supplier = () -> hostsResolver -> {
throw new UnsupportedOperationException();
};
} else {