diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index cf6a2103470..7e926db6e78 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -339,6 +339,7 @@ public final class ClusterSettings extends AbstractScopedSettings { ZenDiscovery.MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING, UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING, UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING, + UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT, SearchService.DEFAULT_KEEPALIVE_SETTING, SearchService.KEEPALIVE_INTERVAL_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 7794c58ddd3..eec9548dd08 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.zen; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; @@ -61,12 +62,17 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -74,6 +80,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -89,6 +96,8 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { Property.NodeScope); public static final Setting DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING = Setting.intSetting("discovery.zen.ping.unicast.concurrent_connects", 10, 0, Property.NodeScope); + public static final Setting DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT = + Setting.positiveTimeSetting("discovery.zen.ping.unicast.hosts.resolve_timeout", TimeValue.timeValueSeconds(5), Property.NodeScope); // these limits are per-address public static final int LIMIT_FOREIGN_PORTS_COUNT = 1; @@ -100,7 +109,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private final int concurrentConnects; - private final DiscoveryNode[] configuredTargetNodes; + private final List configuredHosts; + + private final int limitPortCounts; private volatile PingContextProvider contextProvider; @@ -114,12 +125,14 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { private final Map receivedResponses = newConcurrentMap(); - // a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes) + // a list of temporal responses a node will return for a request (holds responses from other nodes) private final Queue temporalResponses = ConcurrentCollections.newQueue(); private final UnicastHostsProvider hostsProvider; - private final ExecutorService unicastConnectExecutor; + private final ExecutorService unicastZenPingExecutorService; + + private final TimeValue resolveTimeout; private volatile boolean closed = false; @@ -132,62 +145,110 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { this.hostsProvider = unicastHostsProvider; this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); - List hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); - final int limitPortCounts; + final List hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); if (hosts.isEmpty()) { // if unicast hosts are not specified, fill with simple defaults on the local machine + configuredHosts = transportService.getLocalAddresses(); limitPortCounts = LIMIT_LOCAL_PORTS_COUNT; - hosts.addAll(transportService.getLocalAddresses()); } else { + configuredHosts = hosts; // we only limit to 1 addresses, makes no sense to ping 100 ports limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; } - - logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects); - List configuredTargetNodes = new ArrayList<>(); - for (final String host : hosts) { - configuredTargetNodes.addAll(resolveDiscoveryNodes(host, limitPortCounts, transportService, - () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#")); - } - this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]); + resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); + logger.debug( + "using initial hosts {}, with concurrent_connects [{}], resolve_timeout [{}]", + configuredHosts, + concurrentConnects, + resolveTimeout); transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME, new UnicastPingRequestHandler()); - ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); - unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, - threadFactory, threadPool.getThreadContext()); + final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); + unicastZenPingExecutorService = EsExecutors.newScaling( + "unicast_connect", + 0, concurrentConnects, + 60, + TimeUnit.SECONDS, + threadFactory, + threadPool.getThreadContext()); } /** - * Resolves a host to a list of discovery nodes. The host is resolved into a transport - * address (or a collection of addresses if the number of ports is greater than one) and - * the transport addresses are used to created discovery nodes. + * Resolves a list of hosts to a list of discovery nodes. Each host is resolved into a transport address (or a collection of addresses + * if the number of ports is greater than one) and the transport addresses are used to created discovery nodes. Host lookups are done + * in parallel using specified executor service up to the specified resolve timeout. * - * @param host the host to resolve - * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) + * @param executorService the executor service used to parallelize hostname lookups + * @param logger logger used for logging messages regarding hostname lookups + * @param hosts the hosts to resolve + * @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport) * @param transportService the transport service - * @param idGenerator the generator to supply unique ids for each discovery node + * @param idGenerator the generator to supply unique ids for each discovery node + * @param resolveTimeout the timeout before returning from hostname lookups * @return a list of discovery nodes with resolved transport addresses */ - public static List resolveDiscoveryNodes(final String host, final int limitPortCounts, - final TransportService transportService, final Supplier idGenerator) { - List discoveryNodes = new ArrayList<>(); - try { - TransportAddress[] addresses = transportService.addressesFromString(host, limitPortCounts); - for (TransportAddress address : addresses) { - discoveryNodes.add(new DiscoveryNode(idGenerator.get(), address, emptyMap(), emptySet(), - Version.CURRENT.minimumCompatibilityVersion())); + public static List resolveDiscoveryNodes( + final ExecutorService executorService, + final Logger logger, + final List hosts, + final int limitPortCounts, + final TransportService transportService, + final Supplier idGenerator, + final TimeValue resolveTimeout) throws InterruptedException { + Objects.requireNonNull(executorService); + Objects.requireNonNull(logger); + Objects.requireNonNull(hosts); + Objects.requireNonNull(transportService); + Objects.requireNonNull(idGenerator); + Objects.requireNonNull(resolveTimeout); + if (resolveTimeout.nanos() < 0) { + throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]"); + } + // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete + final List> callables = + hosts + .stream() + .map(hn -> (Callable)() -> transportService.addressesFromString(hn, limitPortCounts)) + .collect(Collectors.toList()); + final List> futures = + executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); + final List discoveryNodes = new ArrayList<>(); + // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the + // hostname with the corresponding task by iterating together + final Iterator it = hosts.iterator(); + for (final Future future : futures) { + final String hostname = it.next(); + if (!future.isCancelled()) { + assert future.isDone(); + try { + final TransportAddress[] addresses = future.get(); + logger.trace("resolved host [{}] to {}", hostname, addresses); + for (final TransportAddress address : addresses) { + discoveryNodes.add( + new DiscoveryNode( + idGenerator.get(), + address, + emptyMap(), + emptySet(), + Version.CURRENT.minimumCompatibilityVersion())); + } + } catch (final ExecutionException e) { + assert e.getCause() != null; + final String message = "failed to resolve host [" + hostname + "]"; + logger.warn(message, e.getCause()); + } + } else { + logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); } - } catch (Exception e) { - throw new IllegalArgumentException("Failed to resolve address for [" + host + "]", e); } return discoveryNodes; } @Override public void close() { - ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS); + ThreadPool.terminate(unicastZenPingExecutorService, 0, TimeUnit.SECONDS); Releasables.close(receivedResponses.values()); closed = true; } @@ -220,27 +281,49 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { } } + /** + * Sends three rounds of pings notifying the specified {@link PingListener} when pinging is complete. Pings are sent after resolving + * configured unicast hosts to their IP address (subject to DNS caching within the JVM). A batch of pings is sent, then another batch + * of pings is sent at half the specified {@link TimeValue}, and then another batch of pings is sent at the specified {@link TimeValue}. + * The pings that are sent carry a timeout of 1.25 times the {@link TimeValue}. + * + * @param listener the callback when pinging is complete + * @param duration the timeout for various components of the pings + */ @Override public void ping(final PingListener listener, final TimeValue duration) { + final List resolvedDiscoveryNodes; + try { + resolvedDiscoveryNodes = resolveDiscoveryNodes( + unicastZenPingExecutorService, + logger, + configuredHosts, + limitPortCounts, + transportService, + () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", + resolveTimeout); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet()); try { receivedResponses.put(sendPingsHandler.id(), sendPingsHandler); try { - sendPings(duration, null, sendPingsHandler); + sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes); } catch (RejectedExecutionException e) { logger.debug("Ping execution rejected", e); - // The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings + // The RejectedExecutionException can come from the fact unicastZenPingExecutorService is at its max down in sendPings // But don't bail here, we can retry later on after the send ping has been scheduled. } threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override protected void doRun() { - sendPings(duration, null, sendPingsHandler); + sendPings(duration, null, sendPingsHandler, resolvedDiscoveryNodes); threadPool.schedule(TimeValue.timeValueMillis(duration.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() { @Override protected void doRun() throws Exception { - sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler); + sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes); sendPingsHandler.close(); listener.onPing(sendPingsHandler.pingCollection().toList()); for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { @@ -305,7 +388,11 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { } - void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) { + void sendPings( + final TimeValue timeout, + @Nullable TimeValue waitTime, + final SendPingsHandler sendPingsHandler, + final List resolvedDiscoveryNodes) { final UnicastPingRequest pingRequest = new UnicastPingRequest(); pingRequest.id = sendPingsHandler.id(); pingRequest.timeout = timeout; @@ -330,8 +417,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { // sort the nodes by likelihood of being an active master List sortedNodesToPing = ElectMasterService.sortByMasterLikelihood(nodesToPingSet); - // new add the unicast targets first - List nodesToPing = CollectionUtils.arrayAsArrayList(configuredTargetNodes); + // add the configured hosts first + final List nodesToPing = new ArrayList<>(resolvedDiscoveryNodes.size() + sortedNodesToPing.size()); + nodesToPing.addAll(resolvedDiscoveryNodes); nodesToPing.addAll(sortedNodesToPing); final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); @@ -369,7 +457,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing { } // fork the connection to another thread final DiscoveryNode finalNodeToSend = nodeToSend; - unicastConnectExecutor.execute(new Runnable() { + unicastZenPingExecutorService.execute(new Runnable() { @Override public void run() { if (sendPingsHandler.isClosed()) { diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 086c48c4114..a68863d0e52 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport; import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; - import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.IOUtils; @@ -715,7 +714,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return parse(address, settings.get("transport.profiles.default.port", TransportSettings.PORT.get(settings)), perAddressLimit); } diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index 9a85a867888..c3c178a2c84 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import java.io.IOException; +import java.net.UnknownHostException; import java.util.List; import java.util.Map; @@ -53,7 +54,7 @@ public interface Transport extends LifecycleComponent { /** * Returns an address from its string representation. */ - TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception; + TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException; /** * Is the address type supported. diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 7b1c83d66aa..16c1842adca 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -52,6 +52,7 @@ import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; @@ -617,7 +618,7 @@ public class TransportService extends AbstractLifecycleComponent { return requestIds.getAndIncrement(); } - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return transport.addressesFromString(address, perAddressLimit); } diff --git a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index bc771f5721d..e94b0b0c8d6 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/core/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -37,6 +37,7 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; +import java.net.UnknownHostException; import java.util.Collections; import java.util.Map; import java.util.Random; @@ -133,7 +134,7 @@ abstract class FailAndRetryMockTransport imp } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { throw new UnsupportedOperationException(); } diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 863349e897a..5dcbefbe034 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -42,6 +42,7 @@ import org.junit.AfterClass; import org.junit.Before; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -188,7 +189,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return new TransportAddress[0]; } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 4294bdd3dd4..4549b78feef 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen; +import org.apache.logging.log4j.Logger; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -34,53 +35,121 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; +import org.junit.After; +import org.junit.Before; +import org.mockito.Matchers; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; public class UnicastZenPingTests extends ESTestCase { + + private ThreadPool threadPool; + private ExecutorService executorService; + // close in reverse order as opened + private Stack closeables; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getClass().getName()); + final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory("[" + getClass().getName() + "]"); + executorService = + EsExecutors.newScaling(getClass().getName(), 0, 2, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext()); + closeables = new Stack<>(); + } + + @After + public void tearDown() throws Exception { + try { + // JDK stack is broken, it does not iterate in the expected order (http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4475301) + final List reverse = new ArrayList<>(); + while (!closeables.isEmpty()) { + reverse.add(closeables.pop()); + } + IOUtils.close(reverse); + } finally { + terminate(executorService); + terminate(threadPool); + super.tearDown(); + } + } + private static final UnicastHostsProvider EMPTY_HOSTS_PROVIDER = Collections::emptyList; public void testSimplePings() throws IOException, InterruptedException { - int startPort = 11000 + randomIntBetween(0, 1000); - int endPort = startPort + 10; - Settings settings = Settings.builder() - .put("cluster.name", "test") - .put(TransportSettings.PORT.getKey(), startPort + "-" + endPort).build(); + // use ephemeral ports + final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); + final Settings settingsMismatch = + Settings.builder().put(settings).put("cluster.name", "mismatch").put(TransportSettings.PORT.getKey(), 0).build(); - Settings settingsMismatch = Settings.builder().put(settings) - .put("cluster.name", "mismatch") - .put(TransportSettings.PORT.getKey(), startPort + "-" + endPort).build(); - - ThreadPool threadPool = new TestThreadPool(getClass().getName()); NetworkService networkService = new NetworkService(settings, Collections.emptyList()); - NetworkHandle handleA = startServices(settings, threadPool, networkService, "UZP_A", Version.CURRENT); - NetworkHandle handleB = startServices(settings, threadPool, networkService, "UZP_B", Version.CURRENT); - NetworkHandle handleC = startServices(settingsMismatch, threadPool, networkService, "UZP_C", Version.CURRENT); + final BiFunction supplier = (s, v) -> new MockTcpTransport( + s, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + v); + + NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier); + closeables.push(handleA.transportService); + NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier); + closeables.push(handleB.transportService); + NetworkHandle handleC = startServices(settingsMismatch, threadPool, "UZP_C", Version.CURRENT, supplier); + closeables.push(handleC.transportService); // just fake that no versions are compatible with this node Version previousVersion = VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()); Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion); - NetworkHandle handleD = startServices(settingsMismatch, threadPool, networkService, "UZP_D", versionD); + NetworkHandle handleD = startServices(settingsMismatch, threadPool, "UZP_D", versionD, supplier); + closeables.push(handleD.transportService); final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); @@ -106,6 +175,7 @@ public class UnicastZenPingTests extends ESTestCase { return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); } }); + closeables.push(zenPingA); UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER); zenPingB.start(new PingContextProvider() { @@ -119,6 +189,7 @@ public class UnicastZenPingTests extends ESTestCase { return state; } }); + closeables.push(zenPingB); UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER) { @Override @@ -137,6 +208,7 @@ public class UnicastZenPingTests extends ESTestCase { return state; } }); + closeables.push(zenPingC); UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, EMPTY_HOSTS_PROVIDER); zenPingD.start(new PingContextProvider() { @@ -150,42 +222,319 @@ public class UnicastZenPingTests extends ESTestCase { return state; } }); + closeables.push(zenPingD); - try { - logger.info("ping from UZP_A"); - Collection pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); + logger.info("ping from UZP_A"); + Collection pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.size(), equalTo(1)); + ZenPing.PingResponse ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_B")); + assertThat(ping.getClusterStateVersion(), equalTo(state.version())); + assertCounters(handleA, handleA, handleB, handleC, handleD); + + // ping again, this time from B, + logger.info("ping from UZP_B"); + pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.size(), equalTo(1)); + ping = pingResponses.iterator().next(); + assertThat(ping.node().getId(), equalTo("UZP_A")); + assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION)); + assertCounters(handleB, handleA, handleB, handleC, handleD); + + logger.info("ping from UZP_C"); + pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.size(), equalTo(0)); + assertCounters(handleC, handleA, handleB, handleC, handleD); + + logger.info("ping from UZP_D"); + pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.size(), equalTo(0)); + assertCounters(handleD, handleA, handleB, handleC, handleD); + } + + public void testUnknownHostNotCached() { + // use ephemeral ports + final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build(); + + final NetworkService networkService = new NetworkService(settings, Collections.emptyList()); + + final Map addresses = new HashMap<>(); + final BiFunction supplier = (s, v) -> new MockTcpTransport( + s, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + v) { + @Override + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { + final TransportAddress[] transportAddresses = addresses.get(address); + if (transportAddresses == null) { + throw new UnknownHostException(address); + } else { + return transportAddresses; + } + } + }; + + final NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier); + closeables.push(handleA.transportService); + final NetworkHandle handleB = startServices(settings, threadPool, "UZP_B", Version.CURRENT, supplier); + closeables.push(handleB.transportService); + final NetworkHandle handleC = startServices(settings, threadPool, "UZP_C", Version.CURRENT, supplier); + closeables.push(handleC.transportService); + + addresses.put( + "UZP_A", + new TransportAddress[]{ + new TransportAddress( + new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort()))}); + addresses.put( + "UZP_C", + new TransportAddress[]{ + new TransportAddress( + new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort()))}); + + final Settings hostsSettings = Settings.builder() + .putArray("discovery.zen.ping.unicast.hosts", "UZP_A", "UZP_B", "UZP_C") + .put("cluster.name", "test") + .build(); + + final ClusterState state = ClusterState.builder(new ClusterName("test")).version(randomPositiveLong()).build(); + + final UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER); + zenPingA.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build(); + } + + @Override + public ClusterState clusterState() { + return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); + } + }); + closeables.push(zenPingA); + + UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER); + zenPingB.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build(); + } + + @Override + public ClusterState clusterState() { + return state; + } + }); + closeables.push(zenPingB); + + UnicastZenPing zenPingC = new UnicastZenPing(hostsSettings, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER); + zenPingC.start(new PingContextProvider() { + @Override + public DiscoveryNodes nodes() { + return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build(); + } + + @Override + public ClusterState clusterState() { + return state; + } + }); + closeables.push(zenPingC); + + // the presence of an unresolvable host should not prevent resolvable hosts from being pinged + { + final Collection pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(3)); assertThat(pingResponses.size(), equalTo(1)); ZenPing.PingResponse ping = pingResponses.iterator().next(); - assertThat(ping.node().getId(), equalTo("UZP_B")); + assertThat(ping.node().getId(), equalTo("UZP_C")); assertThat(ping.getClusterStateVersion(), equalTo(state.version())); - assertCounters(handleA, handleA, handleB, handleC, handleD); - - // ping again, this time from B, - logger.info("ping from UZP_B"); - pingResponses = zenPingB.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.size(), equalTo(1)); - ping = pingResponses.iterator().next(); - assertThat(ping.node().getId(), equalTo("UZP_A")); - assertThat(ping.getClusterStateVersion(), equalTo(ElectMasterService.MasterCandidate.UNRECOVERED_CLUSTER_VERSION)); - assertCounters(handleB, handleA, handleB, handleC, handleD); - - logger.info("ping from UZP_C"); - pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.size(), equalTo(0)); - assertCounters(handleC, handleA, handleB, handleC, handleD); - - logger.info("ping from UZP_D"); - pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(1)); - assertThat(pingResponses.size(), equalTo(0)); - assertCounters(handleD, handleA, handleB, handleC, handleD); - } finally { - try { - IOUtils.close(zenPingA, zenPingB, zenPingC, zenPingD, - handleA.transportService, handleB.transportService, handleC.transportService, handleD.transportService); - } finally { - terminate(threadPool); - } + assertCounters(handleA, handleA, handleC); + assertNull(handleA.counters.get(handleB.address)); } + + // now allow UZP_B to be resolvable + addresses.put( + "UZP_B", + new TransportAddress[]{ + new TransportAddress( + new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort()))}); + + // now we should see pings to UZP_B; this establishes that host resolutions are not cached + { + // ping from C so that we can assert on the counters from a fresh source (as opposed to resetting them) + final Collection secondPingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(3)); + assertThat(secondPingResponses.size(), equalTo(2)); + final Set ids = new HashSet<>(secondPingResponses.stream().map(p -> p.node().getId()).collect(Collectors.toList())); + assertThat(ids, equalTo(new HashSet<>(Arrays.asList("UZP_A", "UZP_B")))); + assertCounters(handleC, handleA, handleB, handleC); + } + } + + public void testPortLimit() throws InterruptedException { + final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList()); + final Transport transport = new MockTcpTransport( + Settings.EMPTY, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + Version.CURRENT); + closeables.push(transport); + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + closeables.push(transportService); + final AtomicInteger idGenerator = new AtomicInteger(); + final int limitPortCounts = randomIntBetween(1, 10); + final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + executorService, + logger, + Collections.singletonList("127.0.0.1"), + limitPortCounts, + transportService, + () -> Integer.toString(idGenerator.incrementAndGet()), + TimeValue.timeValueMillis(100)); + assertThat(discoveryNodes, hasSize(limitPortCounts)); + final Set ports = new HashSet<>(); + for (final DiscoveryNode discoveryNode : discoveryNodes) { + assertTrue(discoveryNode.getAddress().address().getAddress().isLoopbackAddress()); + ports.add(discoveryNode.getAddress().getPort()); + } + assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).mapToObj(m -> m).collect(Collectors.toSet()))); + } + + public void testUnknownHost() throws InterruptedException { + final Logger logger = mock(Logger.class); + final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList()); + final String hostname = randomAsciiOfLength(8); + final UnknownHostException unknownHostException = new UnknownHostException(hostname); + final Transport transport = new MockTcpTransport( + Settings.EMPTY, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + Version.CURRENT) { + + @Override + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { + throw unknownHostException; + } + + }; + closeables.push(transport); + + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + closeables.push(transportService); + final AtomicInteger idGenerator = new AtomicInteger(); + + final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + executorService, + logger, + Arrays.asList(hostname), + 1, + transportService, + () -> Integer.toString(idGenerator.incrementAndGet()), + TimeValue.timeValueMillis(100) + ); + + assertThat(discoveryNodes, empty()); + verify(logger).warn("failed to resolve host [" + hostname + "]", unknownHostException); + } + + public void testResolveTimeout() throws InterruptedException { + final Logger logger = mock(Logger.class); + final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList()); + final CountDownLatch latch = new CountDownLatch(1); + final Transport transport = new MockTcpTransport( + Settings.EMPTY, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + Version.CURRENT) { + + @Override + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { + if ("hostname1".equals(address)) { + return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}; + } else if ("hostname2".equals(address)) { + try { + latch.await(); + return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + throw new UnknownHostException(address); + } + } + + }; + closeables.push(transport); + + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + closeables.push(transportService); + final AtomicInteger idGenerator = new AtomicInteger(); + final TimeValue resolveTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 100)); + try { + final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + executorService, + logger, + Arrays.asList("hostname1", "hostname2"), + 1, + transportService, + () -> Integer.toString(idGenerator.incrementAndGet()), + resolveTimeout); + + assertThat(discoveryNodes, hasSize(1)); + verify(logger).trace( + "resolved host [{}] to {}", "hostname1", + new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}); + verify(logger).warn("timed out after [{}] resolving host [{}]", resolveTimeout, "hostname2"); + verifyNoMoreInteractions(logger); + } finally { + latch.countDown(); + } + } + + public void testInvalidHosts() throws InterruptedException { + final Logger logger = mock(Logger.class); + final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList()); + final Transport transport = new MockTcpTransport( + Settings.EMPTY, + threadPool, + BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), + new NamedWriteableRegistry(Collections.emptyList()), + networkService, + Version.CURRENT); + closeables.push(transport); + + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + closeables.push(transportService); + final AtomicInteger idGenerator = new AtomicInteger(); + final List discoveryNodes = UnicastZenPing.resolveDiscoveryNodes( + executorService, + logger, + Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"), + 1, + transportService, + () -> Integer.toString(idGenerator.incrementAndGet()), + TimeValue.timeValueMillis(100)); + assertThat(discoveryNodes, hasSize(1)); // only one of the two is valid and will be used + assertThat(discoveryNodes.get(0).getAddress().getAddress(), equalTo("127.0.0.1")); + assertThat(discoveryNodes.get(0).getAddress().getPort(), equalTo(9301)); + verify(logger).warn(eq("failed to resolve host [127.0.0.1:9300:9300]"), Matchers.any(ExecutionException.class)); } // assert that we tried to ping each of the configured nodes at least once @@ -197,16 +546,20 @@ public class UnicastZenPingTests extends ESTestCase { } } - private NetworkHandle startServices(Settings settings, ThreadPool threadPool, NetworkService networkService, String nodeId, - Version version) { - MockTcpTransport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, - new NoneCircuitBreakerService(), new NamedWriteableRegistry(Collections.emptyList()), networkService, version); - final TransportService transportService = new TransportService(settings, transport, threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); + private NetworkHandle startServices( + final Settings settings, + final ThreadPool threadPool, + final String nodeId, + final Version version, + final BiFunction supplier) { + final Transport transport = supplier.apply(settings, version); + final TransportService transportService = + new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null); transportService.start(); transportService.acceptIncomingRequests(); - ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); + final ConcurrentMap counters = ConcurrentCollections.newConcurrentMap(); transportService.addConnectionListener(new TransportConnectionListener() { + @Override public void onNodeConnected(DiscoveryNode node) { counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger()); @@ -216,25 +569,32 @@ public class UnicastZenPingTests extends ESTestCase { @Override public void onNodeDisconnected(DiscoveryNode node) { } + }); - final DiscoveryNode node = new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), - version); + final DiscoveryNode node = + new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), version); transportService.setLocalNode(node); - return new NetworkHandle((TransportAddress)transport.boundAddress().publishAddress(), transportService, node, counters); + return new NetworkHandle(transport.boundAddress().publishAddress(), transportService, node, counters); } private static class NetworkHandle { + public final TransportAddress address; public final TransportService transportService; public final DiscoveryNode node; public final ConcurrentMap counters; - public NetworkHandle(TransportAddress address, TransportService transportService, DiscoveryNode discoveryNode, - ConcurrentMap counters) { + public NetworkHandle( + final TransportAddress address, + final TransportService transportService, + final DiscoveryNode discoveryNode, + final ConcurrentMap counters) { this.address = address; this.transportService = transportService; this.node = discoveryNode; this.counters = counters; } + } + } diff --git a/docs/reference/modules/discovery/zen.asciidoc b/docs/reference/modules/discovery/zen.asciidoc index 082567053b2..9202e1b9842 100644 --- a/docs/reference/modules/discovery/zen.asciidoc +++ b/docs/reference/modules/discovery/zen.asciidoc @@ -22,21 +22,31 @@ other nodes. [[unicast]] ===== Unicast -The unicast discovery requires a list of hosts to use that will act -as gossip routers. It provides the following settings with the -`discovery.zen.ping.unicast` prefix: +Unicast discovery requires a list of hosts to use that will act as gossip routers. These hosts can be specified as +hostnames or IP addresses; hosts specified as hostnames are resolved to IP addresses during each round of pinging. Note +that with the Java security manager in place, the JVM defaults to caching positive hostname resolutions indefinitely. +This can be modified by adding +http://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html[`networkaddress.cache.ttl=`] to your +http://docs.oracle.com/javase/8/docs/technotes/guides/security/PolicyFiles.html[Java security policy]. Any hosts that +fail to resolve will be logged. Note also that with the Java security manager in place, the JVM defaults to caching +negative hostname resolutions for ten seconds. This can be modified by adding +http://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html +[`networkaddress.cache.negative.ttl=`] to your + http://docs.oracle.com/javase/8/docs/technotes/guides/security/PolicyFiles.html[Java security policy]. + +Unicast discovery provides the following settings with the `discovery.zen.ping.unicast` prefix: [cols="<,<",options="header",] |======================================================================= |Setting |Description |`hosts` |Either an array setting or a comma delimited setting. Each -value should be in the form of `host:port` or `host` (where `port` defaults to `9300`). Note that IPv6 hosts must be bracketed. Defaults to -`127.0.0.1, [::1]` +value should be in the form of `host:port` or `host` (where `port` defaults to `9300`). Note that IPv6 hosts must be +bracketed. Defaults to `127.0.0.1, [::1]` +|`hosts.resolve_timeout` |The amount of time to wait for DNS lookups on each round of pinging. Specified as +<>. Defaults to 5s. |======================================================================= -The unicast discovery uses the -<> module to -perform the discovery. +The unicast discovery uses the <> module to perform the discovery. [float] [[master-election]] diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java index 055d9df8465..fe6e19f966f 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cloud.aws.AwsEc2Service; import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; @@ -42,6 +41,7 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -78,7 +78,7 @@ public class Ec2DiscoveryTests extends ESTestCase { new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()), Version.CURRENT) { @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { // we just need to ensure we don't resolve DNS here return new TransportAddress[] {poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress())}; } diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java index d93725a03c3..7499232346d 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedDiscoveryPlugin.java @@ -19,21 +19,34 @@ package org.elasticsearch.discovery.file; -import java.util.Collections; -import java.util.Map; -import java.util.function.Supplier; - import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.DiscoveryPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.SearchRequestParsers; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; /** * Plugin for providing file-based unicast hosts discovery. The list of unicast hosts @@ -46,15 +59,45 @@ public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger); private final Settings settings; + private ExecutorService fileBasedDiscoveryExecutorService; public FileBasedDiscoveryPlugin(Settings settings) { this.settings = settings; } + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + SearchRequestParsers searchRequestParsers) { + final int concurrentConnects = UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); + final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[file_based_discovery_resolve]"); + fileBasedDiscoveryExecutorService = EsExecutors.newScaling( + "file_based_discovery_resolve", + 0, + concurrentConnects, + 60, + TimeUnit.SECONDS, + threadFactory, + threadPool.getThreadContext()); + + return Collections.emptyList(); + } + + @Override + public void close() throws IOException { + ThreadPool.terminate(fileBasedDiscoveryExecutorService, 0, TimeUnit.SECONDS); + } + @Override public Map> getZenHostsProviders(TransportService transportService, NetworkService networkService) { - return Collections.singletonMap("file", () -> new FileBasedUnicastHostsProvider(settings, transportService)); + return Collections.singletonMap( + "file", + () -> new FileBasedUnicastHostsProvider(settings, transportService, fileBasedDiscoveryExecutorService)); } @Override diff --git a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java index d7323d43acc..55e2029c8b7 100644 --- a/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java +++ b/plugins/discovery-file/src/main/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProvider.java @@ -23,8 +23,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.env.Environment; import org.elasticsearch.transport.TransportService; @@ -37,10 +37,12 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.discovery.zen.UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT; import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveDiscoveryNodes; /** @@ -61,15 +63,20 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_"; private final TransportService transportService; + private final ExecutorService executorService; private final Path unicastHostsFilePath; private final AtomicLong nodeIdGenerator = new AtomicLong(); // generates unique ids for the node - FileBasedUnicastHostsProvider(Settings settings, TransportService transportService) { + private final TimeValue resolveTimeout; + + FileBasedUnicastHostsProvider(Settings settings, TransportService transportService, ExecutorService executorService) { super(settings); this.transportService = transportService; + this.executorService = executorService; this.unicastHostsFilePath = new Environment(settings).configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE); + this.resolveTimeout = DISCOVERY_ZEN_PING_UNICAST_HOSTS_RESOLVE_TIMEOUT.get(settings); } @Override @@ -89,15 +96,17 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast } final List discoNodes = new ArrayList<>(); - for (final String host : hostsList) { - try { - discoNodes.addAll(resolveDiscoveryNodes(host, 1, transportService, - () -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#")); - } catch (IllegalArgumentException e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[discovery-file] Failed to parse transport address from [{}]", - host), e); - continue; - } + try { + discoNodes.addAll(resolveDiscoveryNodes( + executorService, + logger, + hostsList, + 1, + transportService, + () -> UNICAST_HOST_PREFIX + nodeIdGenerator.incrementAndGet() + "#", + resolveTimeout)); + } catch (InterruptedException e) { + throw new RuntimeException(e); } logger.debug("[discovery-file] Using dynamic discovery nodes {}", discoNodes); diff --git a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java index ffb9726d264..920792b6c7a 100644 --- a/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java +++ b/plugins/discovery-file/src/test/java/org/elasticsearch/discovery/file/FileBasedUnicastHostsProviderTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.MockTcpTransport; import org.elasticsearch.transport.TransportService; +import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -43,6 +44,9 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE; import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOST_PREFIX; @@ -52,17 +56,28 @@ import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNI */ public class FileBasedUnicastHostsProviderTests extends ESTestCase { - private static ThreadPool threadPool; + private ThreadPool threadPool; + private ExecutorService executorService; private MockTransportService transportService; - @BeforeClass - public static void createThreadPool() { + @Before + public void setUp() throws Exception { + super.setUp(); threadPool = new TestThreadPool(FileBasedUnicastHostsProviderTests.class.getName()); + executorService = Executors.newSingleThreadExecutor(); } - @AfterClass - public static void stopThreadPool() throws InterruptedException { - terminate(threadPool); + @After + public void tearDown() throws Exception { + try { + terminate(executorService); + } finally { + try { + terminate(threadPool); + } finally { + super.tearDown(); + } + } } @Before @@ -103,7 +118,7 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase { final Settings settings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .build(); - final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(settings, transportService); + final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(settings, transportService, executorService); final List nodes = provider.buildDynamicNodes(); assertEquals(0, nodes.size()); } @@ -136,6 +151,6 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase { writer.write(String.join("\n", hostEntries)); } - return new FileBasedUnicastHostsProvider(settings, transportService).buildDynamicNodes(); + return new FileBasedUnicastHostsProvider(settings, transportService, executorService).buildDynamicNodes(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 4ff899aeac2..6b3ed0bbad0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -208,7 +209,7 @@ public class CapturingTransport implements Transport { } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return new TransportAddress[0]; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index dfa30874221..ac400065386 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -53,6 +53,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportServiceAdapter; import java.io.IOException; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -506,7 +507,7 @@ public final class MockTransportService extends TransportService { } @Override - public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws Exception { + public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return transport.addressesFromString(address, perAddressLimit); }