Lazy resolve unicast hosts

Today we eagerly resolve unicast hosts. This means that if DNS changes,
we will never find the host at the new address. Moreover, a single host
failng to resolve causes startup to abort. This commit introduces lazy
resolution of unicast hosts. If a DNS entry changes, there is an
opportunity for the host to be discovered. Note that under the Java
security manager, there is a default positive cache of infinity for
resolved hosts; this means that if a user does want to operate in an
environment where DNS can change, they must adjust
networkaddress.cache.ttl in their security policy. And if a host fails
to resolve, we warn log the hostname but continue pinging other
configured hosts.

When doing DNS resolutions for unicast hostnames, we wait until the DNS
lookups timeout. This appears to be forty-five seconds on modern JVMs,
and it is not configurable. If we do these serially, the cluster can be
blocked during ping for a lengthy period of time. This commit introduces
doing the DNS lookups in parallel, and adds a user-configurable timeout
for these lookups.

Relates #21630
This commit is contained in:
Jason Tedor 2016-11-22 14:17:04 -05:00 committed by GitHub
parent 3ff8faf514
commit 9dc65037bc
15 changed files with 672 additions and 141 deletions

View File

@ -339,6 +339,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING,
UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT,
SearchService.DEFAULT_KEEPALIVE_SETTING,
SearchService.KEEPALIVE_INTERVAL_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery.zen;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
@ -61,12 +62,17 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -74,6 +80,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@ -89,6 +96,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
Property.NodeScope);
public static final Setting<Integer> DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING =
Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope);
public static final Setting<TimeValue> DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT =
Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5), Property.NodeScope);
// these limits are per-address
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
@ -100,7 +109,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
private final int concurrentConnects;
private final DiscoveryNode[] configuredTargetNodes;
private final List<String> configuredHosts;
private final int limitPortCounts;
private volatile PingContextProvider contextProvider;
@ -114,12 +125,14 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
private final Map<Integer, SendPingsHandler> receivedResponses = newConcurrentMap();
// a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes)
// a list of temporal responses a node will return for a request (holds responses from other nodes)
private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();
private final UnicastHostsProvider hostsProvider;
private final ExecutorService unicastConnectExecutor;
private final ExecutorService unicastZenPingExecutorService;
private final TimeValue resolveTimeout;
private volatile boolean closed = false;
@ -132,62 +145,110 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
this.hostsProvider = unicastHostsProvider;
this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
final int limitPortCounts;
final List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
if (hosts.isEmpty()) {
// if unicast hosts are not specified, fill with simple defaults on the local machine
configuredHosts = transportService.getLocalAddresses();
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
hosts.addAll(transportService.getLocalAddresses());
} else {
configuredHosts = hosts;
// we only limit to 1 addresses, makes no sense to ping 100 ports
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
}
logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects);
List<DiscoveryNode> configuredTargetNodes = new ArrayList<>();
for (final String host : hosts) {
configuredTargetNodes.addAll(resolveDiscoveryNodes(host, limitPortCounts, transportService,
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#"));
}
this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]);
resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
logger.debug(
"using initial hosts {}, with concurrent_connects [{}], resolve_timeout [{}]",
configuredHosts,
concurrentConnects,
resolveTimeout);
transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME,
new UnicastPingRequestHandler());
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS,
threadFactory, threadPool.getThreadContext());
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastZenPingExecutorService = EsExecutors.newScaling(
"unicast_connect",
0, concurrentConnects,
60,
TimeUnit.SECONDS,
threadFactory,
threadPool.getThreadContext());
}
/**
* Resolves a host to a list of discovery nodes. The host is resolved into a transport
* address (or a collection of addresses if the number of ports is greater than one) and
* the transport addresses are used to created discovery nodes.
* Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses
* if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done
* in parallel using specified executor service up to the specified resolve timeout.
*
* @param host the host to resolve
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
* @param executorService the executor service used to parallelize hostname lookups
* @param logger logger used for logging messages regarding hostname lookups
* @param hosts the hosts to resolve
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
* @param transportService the transport service
* @param idGenerator the generator to supply unique ids for each discovery node
* @param idGenerator the generator to supply unique ids for each discovery node
* @param resolveTimeout the timeout before returning from hostname lookups
* @return a list of discovery nodes with resolved transport addresses
*/
public static List<DiscoveryNode> resolveDiscoveryNodes(final String host, final int limitPortCounts,
final TransportService transportService, final Supplier<String> idGenerator) {
List<DiscoveryNode> discoveryNodes = new ArrayList<>();
try {
TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts);
for (TransportAddress address : addresses) {
discoveryNodes.add(new DiscoveryNode(idGenerator.get(), address, emptyMap(), emptySet(),
Version.CURRENT.minimumCompatibilityVersion()));
public static List<DiscoveryNode> resolveDiscoveryNodes(
final ExecutorService executorService,
final Logger logger,
final List<String> hosts,
final int limitPortCounts,
final TransportService transportService,
final Supplier<String> idGenerator,
final TimeValue resolveTimeout) throws InterruptedException {
Objects.requireNonNull(executorService);
Objects.requireNonNull(logger);
Objects.requireNonNull(hosts);
Objects.requireNonNull(transportService);
Objects.requireNonNull(idGenerator);
Objects.requireNonNull(resolveTimeout);
if (resolveTimeout.nanos() < 0) {
throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
}
// create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
final List<Callable<TransportAddress[]>> callables =
hosts
.stream()
.map(hn -> (Callable<TransportAddress[]>)() -> transportService.addressesFromString(hn, limitPortCounts))
.collect(Collectors.toList());
final List<Future<TransportAddress[]>> futures =
executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
final List<DiscoveryNode> discoveryNodes = new ArrayList<>();
// ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
// hostname with the corresponding task by iterating together
final Iterator<String> it = hosts.iterator();
for (final Future<TransportAddress[]> future : futures) {
final String hostname = it.next();
if (!future.isCancelled()) {
assert future.isDone();
try {
final TransportAddress[] addresses = future.get();
logger.trace("resolved host [{}] to {}", hostname, addresses);
for (final TransportAddress address : addresses) {
discoveryNodes.add(
new DiscoveryNode(
idGenerator.get(),
address,
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion()));
}
} catch (final ExecutionException e) {
assert e.getCause() != null;
final String message = "failed to resolve host [" + hostname + "]";
logger.warn(message, e.getCause());
}
} else {
logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e);
}
return discoveryNodes;
}
@Override
public void close() {
ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS);
ThreadPool.terminate(unicastZenPingExecutorService, 0, TimeUnit.SECONDS);
Releasables.close(receivedResponses.values());
closed = true;
}
@ -220,27 +281,49 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
}
/**
* Sends three rounds of pings notifying the specified {@link PingListener} when pinging is complete. Pings are sent after resolving
* configured unicast hosts to their IP address (subject to DNS caching within the JVM). A batch of pings is sent, then another batch
* of pings is sent at half the specified {@link TimeValue}, and then another batch of pings is sent at the specified {@link TimeValue}.
* The pings that are sent carry a timeout of 1.25 times the {@link TimeValue}.
*
* @param listener the callback when pinging is complete
* @param duration the timeout for various components of the pings
*/
@Override
public void ping(final PingListener listener, final TimeValue duration) {
final List<DiscoveryNode> resolvedDiscoveryNodes;
try {
resolvedDiscoveryNodes = resolveDiscoveryNodes(
unicastZenPingExecutorService,
logger,
configuredHosts,
limitPortCounts,
transportService,
() -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#",
resolveTimeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet());
try {
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
try {
sendPings(duration, null, sendPingsHandler);
sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes);
} catch (RejectedExecutionException e) {
logger.debug("Ping execution rejected", e);
// The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings
// The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings
// But don't bail here, we can retry later on after the send ping has been scheduled.
}
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() {
sendPings(duration, null, sendPingsHandler);
sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes);
threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler);
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes);
sendPingsHandler.close();
listener.onPing(sendPingsHandler.pingCollection().toList());
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
@ -305,7 +388,11 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) {
void sendPings(
final TimeValue timeout,
@Nullable TimeValue waitTime,
final SendPingsHandler sendPingsHandler,
final List<DiscoveryNode> resolvedDiscoveryNodes) {
final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = sendPingsHandler.id();
pingRequest.timeout = timeout;
@ -330,8 +417,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
// sort the nodes by likelihood of being an active master
List<DiscoveryNode> sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet);
// new add the unicast targets first
List<DiscoveryNode> nodesToPing = CollectionUtils.arrayAsArrayList(configuredTargetNodes);
// add the configured hosts first
final List<DiscoveryNode> nodesToPing = new ArrayList<>(resolvedDiscoveryNodes.size() + sortedNodesToPing.size());
nodesToPing.addAll(resolvedDiscoveryNodes);
nodesToPing.addAll(sortedNodesToPing);
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
@ -369,7 +457,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
}
// fork the connection to another thread
final DiscoveryNode finalNodeToSend = nodeToSend;
unicastConnectExecutor.execute(new Runnable() {
unicastZenPingExecutorService.execute(new Runnable() {
@Override
public void run() {
if (sendPingsHandler.isClosed()) {

View File

@ -20,7 +20,6 @@ package org.elasticsearch.transport;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
@ -715,7 +714,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return parse(address, settings.get("transport.profiles.default.port", TransportSettings.PORT.get(settings)), perAddressLimit);
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
@ -53,7 +54,7 @@ public interface Transport extends LifecycleComponent {
/**
* Returns an address from its string representation.
*/
TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception;
TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException;
/**
* Is the address type supported.

View File

@ -52,6 +52,7 @@ import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
@ -617,7 +618,7 @@ public class TransportService extends AbstractLifecycleComponent {
return requestIds.getAndIncrement();
}
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return transport.addressesFromString(address, perAddressLimit);
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportServiceAdapter;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
@ -133,7 +134,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
throw new UnsupportedOperationException();
}

View File

@ -42,6 +42,7 @@ import org.junit.AfterClass;
import org.junit.Before;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -188,7 +189,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return new TransportAddress[0];
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.discovery.zen;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
@ -34,53 +35,121 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.junit.After;
import org.junit.Before;
import org.mockito.Matchers;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class UnicastZenPingTests extends ESTestCase {
private ThreadPool threadPool;
private ExecutorService executorService;
// close in reverse order as opened
private Stack<Closeable> closeables;
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory("[" + getClass().getName() + "]");
executorService =
EsExecutors.newScaling(getClass().getName(), 0, 2, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext());
closeables = new Stack<>();
}
@After
public void tearDown() throws Exception {
try {
// JDK stack is broken, it does not iterate in the expected order (http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4475301)
final List<Closeable> reverse = new ArrayList<>();
while (!closeables.isEmpty()) {
reverse.add(closeables.pop());
}
IOUtils.close(reverse);
} finally {
terminate(executorService);
terminate(threadPool);
super.tearDown();
}
}
private static final UnicastHostsProvider EMPTY_HOSTS_PROVIDER = Collections::emptyList;
public void testSimplePings() throws IOException, InterruptedException {
int startPort = 11000 + randomIntBetween(0, 1000);
int endPort = startPort + 10;
Settings settings = Settings.builder()
.put("cluster.name", "test")
.put(TransportSettings.PORT.getKey(), startPort + "-" + endPort).build();
// use ephemeral ports
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
final Settings settingsMismatch =
Settings.builder().put(settings).put("cluster.name", "mismatch").put(TransportSettings.PORT.getKey(), 0).build();
Settings settingsMismatch = Settings.builder().put(settings)
.put("cluster.name", "mismatch")
.put(TransportSettings.PORT.getKey(), startPort + "-" + endPort).build();
ThreadPool threadPool = new TestThreadPool(getClass().getName());
NetworkService networkService = new NetworkService(settings, Collections.emptyList());
NetworkHandle handleA = startServices(settings, threadPool, networkService, "UZP_A", Version.CURRENT);
NetworkHandle handleB = startServices(settings, threadPool, networkService, "UZP_B", Version.CURRENT);
NetworkHandle handleC = startServices(settingsMismatch, threadPool, networkService, "UZP_C", Version.CURRENT);
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
s,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
v);
NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier);
closeables.push(handleA.transportService);
NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier);
closeables.push(handleB.transportService);
NetworkHandle handleC = startServices(settingsMismatch, threadPool, "UZP_C", Version.CURRENT, supplier);
closeables.push(handleC.transportService);
// just fake that no versions are compatible with this node
Version previousVersion = VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion());
Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion);
NetworkHandle handleD = startServices(settingsMismatch, threadPool, networkService, "UZP_D", versionD);
NetworkHandle handleD = startServices(settingsMismatch, threadPool, "UZP_D", versionD, supplier);
closeables.push(handleD.transportService);
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build();
@ -106,6 +175,7 @@ public class UnicastZenPingTests extends ESTestCase {
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
}
});
closeables.push(zenPingA);
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER);
zenPingB.start(new PingContextProvider() {
@ -119,6 +189,7 @@ public class UnicastZenPingTests extends ESTestCase {
return state;
}
});
closeables.push(zenPingB);
UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER) {
@Override
@ -137,6 +208,7 @@ public class UnicastZenPingTests extends ESTestCase {
return state;
}
});
closeables.push(zenPingC);
UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, EMPTY_HOSTS_PROVIDER);
zenPingD.start(new PingContextProvider() {
@ -150,42 +222,319 @@ public class UnicastZenPingTests extends ESTestCase {
return state;
}
});
closeables.push(zenPingD);
try {
logger.info("ping from UZP_A");
Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1));
logger.info("ping from UZP_A");
Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.size(), equalTo(1));
ZenPing.PingResponse ping = pingResponses.iterator().next();
assertThat(ping.node().getId(), equalTo("UZP_B"));
assertThat(ping.getClusterStateVersion(), equalTo(state.version()));
assertCounters(handleA, handleA, handleB, handleC, handleD);
// ping again, this time from B,
logger.info("ping from UZP_B");
pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.size(), equalTo(1));
ping = pingResponses.iterator().next();
assertThat(ping.node().getId(), equalTo("UZP_A"));
assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION));
assertCounters(handleB, handleA, handleB, handleC, handleD);
logger.info("ping from UZP_C");
pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.size(), equalTo(0));
assertCounters(handleC, handleA, handleB, handleC, handleD);
logger.info("ping from UZP_D");
pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.size(), equalTo(0));
assertCounters(handleD, handleA, handleB, handleC, handleD);
}
public void testUnknownHostNotCached() {
// use ephemeral ports
final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();
final NetworkService networkService = new NetworkService(settings, Collections.emptyList());
final Map<String, TransportAddress[]> addresses = new HashMap<>();
final BiFunction<Settings, Version, Transport> supplier = (s, v) -> new MockTcpTransport(
s,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
v) {
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
final TransportAddress[] transportAddresses = addresses.get(address);
if (transportAddresses == null) {
throw new UnknownHostException(address);
} else {
return transportAddresses;
}
}
};
final NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier);
closeables.push(handleA.transportService);
final NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier);
closeables.push(handleB.transportService);
final NetworkHandle handleC = startServices(settings, threadPool, "UZP_C", Version.CURRENT, supplier);
closeables.push(handleC.transportService);
addresses.put(
"UZP_A",
new TransportAddress[]{
new TransportAddress(
new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort()))});
addresses.put(
"UZP_C",
new TransportAddress[]{
new TransportAddress(
new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort()))});
final Settings hostsSettings = Settings.builder()
.putArray("discovery.zen.ping.unicast.hosts", "UZP_A", "UZP_B", "UZP_C")
.put("cluster.name", "test")
.build();
final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build();
final UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER);
zenPingA.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build();
}
@Override
public ClusterState clusterState() {
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
}
});
closeables.push(zenPingA);
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER);
zenPingB.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build();
}
@Override
public ClusterState clusterState() {
return state;
}
});
closeables.push(zenPingB);
UnicastZenPing zenPingC = new UnicastZenPing(hostsSettings, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER);
zenPingC.start(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build();
}
@Override
public ClusterState clusterState() {
return state;
}
});
closeables.push(zenPingC);
// the presence of an unresolvable host should not prevent resolvable hosts from being pinged
{
final Collection<ZenPing.PingResponse> pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(3));
assertThat(pingResponses.size(), equalTo(1));
ZenPing.PingResponse ping = pingResponses.iterator().next();
assertThat(ping.node().getId(), equalTo("UZP_B"));
assertThat(ping.node().getId(), equalTo("UZP_C"));
assertThat(ping.getClusterStateVersion(), equalTo(state.version()));
assertCounters(handleA, handleA, handleB, handleC, handleD);
// ping again, this time from B,
logger.info("ping from UZP_B");
pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.size(), equalTo(1));
ping = pingResponses.iterator().next();
assertThat(ping.node().getId(), equalTo("UZP_A"));
assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION));
assertCounters(handleB, handleA, handleB, handleC, handleD);
logger.info("ping from UZP_C");
pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.size(), equalTo(0));
assertCounters(handleC, handleA, handleB, handleC, handleD);
logger.info("ping from UZP_D");
pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.size(), equalTo(0));
assertCounters(handleD, handleA, handleB, handleC, handleD);
} finally {
try {
IOUtils.close(zenPingA, zenPingB, zenPingC, zenPingD,
handleA.transportService, handleB.transportService, handleC.transportService, handleD.transportService);
} finally {
terminate(threadPool);
}
assertCounters(handleA, handleA, handleC);
assertNull(handleA.counters.get(handleB.address));
}
// now allow UZP_B to be resolvable
addresses.put(
"UZP_B",
new TransportAddress[]{
new TransportAddress(
new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort()))});
// now we should see pings to UZP_B; this establishes that host resolutions are not cached
{
// ping from C so that we can assert on the counters from a fresh source (as opposed to resetting them)
final Collection<ZenPing.PingResponse> secondPingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(3));
assertThat(secondPingResponses.size(), equalTo(2));
final Set<String> ids = new HashSet<>(secondPingResponses.stream().map(p -> p.node().getId()).collect(Collectors.toList()));
assertThat(ids, equalTo(new HashSet<>(Arrays.asList("UZP_A", "UZP_B"))));
assertCounters(handleC, handleA, handleB, handleC);
}
}
public void testPortLimit() throws InterruptedException {
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
final Transport transport = new MockTcpTransport(
Settings.EMPTY,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
Version.CURRENT);
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
closeables.push(transportService);
final AtomicInteger idGenerator = new AtomicInteger();
final int limitPortCounts = randomIntBetween(1, 10);
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
executorService,
logger,
Collections.singletonList("127.0.0.1"),
limitPortCounts,
transportService,
() -> Integer.toString(idGenerator.incrementAndGet()),
TimeValue.timeValueMillis(100));
assertThat(discoveryNodes, hasSize(limitPortCounts));
final Set<Integer> ports = new HashSet<>();
for (final DiscoveryNode discoveryNode : discoveryNodes) {
assertTrue(discoveryNode.getAddress().address().getAddress().isLoopbackAddress());
ports.add(discoveryNode.getAddress().getPort());
}
assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).mapToObj(m -> m).collect(Collectors.toSet())));
}
public void testUnknownHost() throws InterruptedException {
final Logger logger = mock(Logger.class);
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
final String hostname = randomAsciiOfLength(8);
final UnknownHostException unknownHostException = new UnknownHostException(hostname);
final Transport transport = new MockTcpTransport(
Settings.EMPTY,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
Version.CURRENT) {
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
throw unknownHostException;
}
};
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
closeables.push(transportService);
final AtomicInteger idGenerator = new AtomicInteger();
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
executorService,
logger,
Arrays.asList(hostname),
1,
transportService,
() -> Integer.toString(idGenerator.incrementAndGet()),
TimeValue.timeValueMillis(100)
);
assertThat(discoveryNodes, empty());
verify(logger).warn("failed to resolve host [" + hostname + "]", unknownHostException);
}
public void testResolveTimeout() throws InterruptedException {
final Logger logger = mock(Logger.class);
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
final CountDownLatch latch = new CountDownLatch(1);
final Transport transport = new MockTcpTransport(
Settings.EMPTY,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
Version.CURRENT) {
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
if ("hostname1".equals(address)) {
return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)};
} else if ("hostname2".equals(address)) {
try {
latch.await();
return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)};
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
throw new UnknownHostException(address);
}
}
};
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
closeables.push(transportService);
final AtomicInteger idGenerator = new AtomicInteger();
final TimeValue resolveTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 100));
try {
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
executorService,
logger,
Arrays.asList("hostname1", "hostname2"),
1,
transportService,
() -> Integer.toString(idGenerator.incrementAndGet()),
resolveTimeout);
assertThat(discoveryNodes, hasSize(1));
verify(logger).trace(
"resolved host [{}] to {}", "hostname1",
new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)});
verify(logger).warn("timed out after [{}] resolving host [{}]", resolveTimeout, "hostname2");
verifyNoMoreInteractions(logger);
} finally {
latch.countDown();
}
}
public void testInvalidHosts() throws InterruptedException {
final Logger logger = mock(Logger.class);
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
final Transport transport = new MockTcpTransport(
Settings.EMPTY,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
Version.CURRENT);
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
closeables.push(transportService);
final AtomicInteger idGenerator = new AtomicInteger();
final List<DiscoveryNode> discoveryNodes = UnicastZenPing.resolveDiscoveryNodes(
executorService,
logger,
Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"),
1,
transportService,
() -> Integer.toString(idGenerator.incrementAndGet()),
TimeValue.timeValueMillis(100));
assertThat(discoveryNodes, hasSize(1)); // only one of the two is valid and will be used
assertThat(discoveryNodes.get(0).getAddress().getAddress(), equalTo("127.0.0.1"));
assertThat(discoveryNodes.get(0).getAddress().getPort(), equalTo(9301));
verify(logger).warn(eq("failed to resolve host [127.0.0.1:9300:9300]"), Matchers.any(ExecutionException.class));
}
// assert that we tried to ping each of the configured nodes at least once
@ -197,16 +546,20 @@ public class UnicastZenPingTests extends ESTestCase {
}
}
private NetworkHandle startServices(Settings settings, ThreadPool threadPool, NetworkService networkService, String nodeId,
Version version) {
MockTcpTransport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), new NamedWriteableRegistry(Collections.emptyList()), networkService, version);
final TransportService transportService = new TransportService(settings, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
private NetworkHandle startServices(
final Settings settings,
final ThreadPool threadPool,
final String nodeId,
final Version version,
final BiFunction<Settings, Version, Transport> supplier) {
final Transport transport = supplier.apply(settings, version);
final TransportService transportService =
new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();
final ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();
transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger());
@ -216,25 +569,32 @@ public class UnicastZenPingTests extends ESTestCase {
@Override
public void onNodeDisconnected(DiscoveryNode node) {
}
});
final DiscoveryNode node = new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(),
version);
final DiscoveryNode node =
new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), version);
transportService.setLocalNode(node);
return new NetworkHandle((TransportAddress)transport.boundAddress().publishAddress(), transportService, node, counters);
return new NetworkHandle(transport.boundAddress().publishAddress(), transportService, node, counters);
}
private static class NetworkHandle {
public final TransportAddress address;
public final TransportService transportService;
public final DiscoveryNode node;
public final ConcurrentMap<TransportAddress, AtomicInteger> counters;
public NetworkHandle(TransportAddress address, TransportService transportService, DiscoveryNode discoveryNode,
ConcurrentMap<TransportAddress, AtomicInteger> counters) {
public NetworkHandle(
final TransportAddress address,
final TransportService transportService,
final DiscoveryNode discoveryNode,
final ConcurrentMap<TransportAddress, AtomicInteger> counters) {
this.address = address;
this.transportService = transportService;
this.node = discoveryNode;
this.counters = counters;
}
}
}

View File

@ -22,21 +22,31 @@ other nodes.
[[unicast]]
===== Unicast
The unicast discovery requires a list of hosts to use that will act
as gossip routers. It provides the following settings with the
`discovery.zen.ping.unicast` prefix:
Unicast discovery requires a list of hosts to use that will act as gossip routers. These hosts can be specified as
hostnames or IP addresses; hosts specified as hostnames are resolved to IP addresses during each round of pinging. Note
that with the Java security manager in place, the JVM defaults to caching positive hostname resolutions indefinitely.
This can be modified by adding
http://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html[`networkaddress.cache.ttl=<timeout>`] to your
http://docs.oracle.com/javase/8/docs/technotes/guides/security/PolicyFiles.html[Java security policy]. Any hosts that
fail to resolve will be logged. Note also that with the Java security manager in place, the JVM defaults to caching
negative hostname resolutions for ten seconds. This can be modified by adding
http://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html
[`networkaddress.cache.negative.ttl=<timeout>`] to your
http://docs.oracle.com/javase/8/docs/technotes/guides/security/PolicyFiles.html[Java security policy].
Unicast discovery provides the following settings with the `discovery.zen.ping.unicast` prefix:
[cols="<,<",options="header",]
|=======================================================================
|Setting |Description
|`hosts` |Either an array setting or a comma delimited setting. Each
value should be in the form of `host:port` or `host` (where `port` defaults to `9300`). Note that IPv6 hosts must be bracketed. Defaults to
`127.0.0.1, [::1]`
value should be in the form of `host:port` or `host` (where `port` defaults to `9300`). Note that IPv6 hosts must be
bracketed. Defaults to `127.0.0.1, [::1]`
|`hosts.resolve_timeout` |The amount of time to wait for DNS lookups on each round of pinging. Specified as
<<time-units, time units>>. Defaults to 5s.
|=======================================================================
The unicast discovery uses the
<<modules-transport,transport>> module to
perform the discovery.
The unicast discovery uses the <<modules-transport,transport>> module to perform the discovery.
[float]
[[master-election]]

View File

@ -24,7 +24,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
@ -42,6 +41,7 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -78,7 +78,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),
Version.CURRENT) {
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
// we just need to ensure we don't resolve DNS here
return new TransportAddress[] {poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress())};
}

View File

@ -19,21 +19,34 @@
package org.elasticsearch.discovery.file;
import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* Plugin for providing file-based unicast hosts discovery. The list of unicast hosts
@ -46,15 +59,45 @@ public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
private final Settings settings;
private ExecutorService fileBasedDiscoveryExecutorService;
public FileBasedDiscoveryPlugin(Settings settings) {
this.settings = settings;
}
@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
SearchRequestParsers searchRequestParsers) {
final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]");
fileBasedDiscoveryExecutorService = EsExecutors.newScaling(
"file_based_discovery_resolve",
0,
concurrentConnects,
60,
TimeUnit.SECONDS,
threadFactory,
threadPool.getThreadContext());
return Collections.emptyList();
}
@Override
public void close() throws IOException {
ThreadPool.terminate(fileBasedDiscoveryExecutorService, 0, TimeUnit.SECONDS);
}
@Override
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
return Collections.singletonMap("file", () -> new FileBasedUnicastHostsProvider(settings, transportService));
return Collections.singletonMap(
"file",
() -> new FileBasedUnicastHostsProvider(settings, transportService, fileBasedDiscoveryExecutorService));
}
@Override

View File

@ -23,8 +23,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.env.Environment;
import org.elasticsearch.transport.TransportService;
@ -37,10 +37,12 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT;
import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveDiscoveryNodes;
/**
@ -61,15 +63,20 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_";
private final TransportService transportService;
private final ExecutorService executorService;
private final Path unicastHostsFilePath;
private final AtomicLong nodeIdGenerator = new AtomicLong(); // generates unique ids for the node
FileBasedUnicastHostsProvider(Settings settings, TransportService transportService) {
private final TimeValue resolveTimeout;
FileBasedUnicastHostsProvider(Settings settings, TransportService transportService, ExecutorService executorService) {
super(settings);
this.transportService = transportService;
this.executorService = executorService;
this.unicastHostsFilePath = new Environment(settings).configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE);
this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings);
}
@Override
@ -89,15 +96,17 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
}
final List<DiscoveryNode> discoNodes = new ArrayList<>();
for (final String host : hostsList) {
try {
discoNodes.addAll(resolveDiscoveryNodes(host, 1, transportService,
() -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#"));
} catch (IllegalArgumentException e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[discovery-file] Failed to parse transport address from [{}]",
host), e);
continue;
}
try {
discoNodes.addAll(resolveDiscoveryNodes(
executorService,
logger,
hostsList,
1,
transportService,
() -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#",
resolveTimeout));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.debug("[discovery-file] Using dynamic discovery nodes {}", discoNodes);

View File

@ -32,6 +32,7 @@ import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -43,6 +44,9 @@ import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE;
import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOST_PREFIX;
@ -52,17 +56,28 @@ import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNI
*/
public class FileBasedUnicastHostsProviderTests extends ESTestCase {
private static ThreadPool threadPool;
private ThreadPool threadPool;
private ExecutorService executorService;
private MockTransportService transportService;
@BeforeClass
public static void createThreadPool() {
@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(FileBasedUnicastHostsProviderTests.class.getName());
executorService = Executors.newSingleThreadExecutor();
}
@AfterClass
public static void stopThreadPool() throws InterruptedException {
terminate(threadPool);
@After
public void tearDown() throws Exception {
try {
terminate(executorService);
} finally {
try {
terminate(threadPool);
} finally {
super.tearDown();
}
}
}
@Before
@ -103,7 +118,7 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
final Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(settings, transportService);
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(settings, transportService, executorService);
final List<DiscoveryNode> nodes = provider.buildDynamicNodes();
assertEquals(0, nodes.size());
}
@ -136,6 +151,6 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
writer.write(String.join("\n", hostEntries));
}
return new FileBasedUnicastHostsProvider(settings, transportService).buildDynamicNodes();
return new FileBasedUnicastHostsProvider(settings, transportService, executorService).buildDynamicNodes();
}
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportServiceAdapter;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -208,7 +209,7 @@ public class CapturingTransport implements Transport {
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return new TransportAddress[0];
}

View File

@ -53,6 +53,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportServiceAdapter;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -506,7 +507,7 @@ public final class MockTransportService extends TransportService {
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return transport.addressesFromString(address, perAddressLimit);
}