From 8711a092bffc9e2a7b16a427cadefe1b89a05e1e Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 12 Jun 2019 18:49:59 +0200 Subject: [PATCH] Stop SeedHostsResolver on shutdown (#42844) Fixes an issue where tests would sometimes hang for 5 seconds when restarting a node. The reason is that the SeedHostsResolver is blockingly waiting on a result for the full 5 seconds when the corresponding threadpool is shut down. --- .../discovery/SeedHostsResolver.java | 22 ++++-- .../discovery/zen/UnicastZenPing.java | 4 +- .../FileBasedSeedHostsProviderTests.java | 7 +- .../discovery/SeedHostsResolverTests.java | 73 ++++++++++++++++++- 4 files changed, 95 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java b/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java index 61a0b213d63..f25d25b0211 100644 --- a/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java +++ b/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.discovery.PeerFinder.ConfiguredHostsResolver; @@ -73,6 +74,7 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con private final TimeValue resolveTimeout; private final String nodeName; private final int concurrentConnects; + private final CancellableThreads cancellableThreads = new CancellableThreads(); public SeedHostsResolver(String nodeName, Settings settings, TransportService transportService, SeedHostsProvider seedProvider) { @@ -121,6 +123,7 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con * @return a list of resolved transport addresses */ public static List resolveHostsLists( + final CancellableThreads cancellableThreads, final ExecutorService executorService, final Logger logger, final List hosts, @@ -140,11 +143,11 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con .stream() .map(hn -> (Callable) () -> transportService.addressesFromString(hn)) .collect(Collectors.toList()); - final List> futures; + final SetOnce>> futures = new SetOnce<>(); try { - futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + cancellableThreads.execute(() -> + futures.set(executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS))); + } catch (CancellableThreads.ExecutionCancelledException e) { return Collections.emptyList(); } final List transportAddresses = new ArrayList<>(); @@ -154,10 +157,10 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the // hostname with the corresponding task by iterating together final Iterator it = hosts.iterator(); - for (final Future future : futures) { + for (final Future future : futures.get()) { + assert future.isDone(); final String hostname = it.next(); if (!future.isCancelled()) { - assert future.isDone(); try { final TransportAddress[] addresses = future.get(); logger.trace("resolved host [{}] to {}", hostname, addresses); @@ -193,6 +196,7 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con @Override protected void doStop() { + cancellableThreads.cancel("stopping SeedHostsResolver"); ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS); } @@ -223,7 +227,7 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con List providedAddresses = hostsProvider.getSeedAddresses(hosts -> - resolveHostsLists(executorService.get(), logger, hosts, transportService, resolveTimeout)); + resolveHostsLists(cancellableThreads, executorService.get(), logger, hosts, transportService, resolveTimeout)); consumer.accept(providedAddresses); } @@ -240,4 +244,8 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con }); } } + + List resolveHosts(List hosts) { + return resolveHostsLists(cancellableThreads, executorService.get(), logger, hosts, transportService, resolveTimeout); + } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index f918e254f80..057de230cc0 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -144,7 +145,8 @@ public class UnicastZenPing implements ZenPing { } private SeedHostsProvider.HostsResolver createHostsResolver() { - return hosts -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts, transportService, resolveTimeout); + return hosts -> SeedHostsResolver.resolveHostsLists(new CancellableThreads(), unicastZenPingExecutorService, logger, hosts, + transportService, resolveTimeout); } @Override diff --git a/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java b/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java index ccd929d5ade..0251b5bead9 100644 --- a/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/FileBasedSeedHostsProviderTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; @@ -116,7 +117,8 @@ public class FileBasedSeedHostsProviderTests extends ESTestCase { public void testUnicastHostsDoesNotExist() { final FileBasedSeedHostsProvider provider = new FileBasedSeedHostsProvider(createTempDir().toAbsolutePath()); final List addresses = provider.getSeedAddresses(hosts -> - SeedHostsResolver.resolveHostsLists(executorService, logger, hosts, transportService, TimeValue.timeValueSeconds(10))); + SeedHostsResolver.resolveHostsLists(new CancellableThreads(), executorService, logger, hosts, transportService, + TimeValue.timeValueSeconds(10))); assertEquals(0, addresses.size()); } @@ -145,6 +147,7 @@ public class FileBasedSeedHostsProviderTests extends ESTestCase { } return new FileBasedSeedHostsProvider(configPath).getSeedAddresses(hosts -> - SeedHostsResolver.resolveHostsLists(executorService, logger, hosts, transportService, TimeValue.timeValueSeconds(10))); + SeedHostsResolver.resolveHostsLists(new CancellableThreads(), executorService, logger, hosts, transportService, + TimeValue.timeValueSeconds(10))); } } diff --git a/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java b/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java index 451548bd743..72f4971ad24 100644 --- a/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/SeedHostsResolverTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.discovery; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkService; @@ -28,8 +29,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; @@ -100,6 +103,14 @@ public class SeedHostsResolverTests extends ESTestCase { closeables = new Stack<>(); } + private void recreateSeedHostsResolver(TransportService transportService, Settings settings) { + if (seedHostsResolver != null) { + seedHostsResolver.stop(); + } + seedHostsResolver = new SeedHostsResolver("test_node", settings, transportService, hostsResolver -> transportAddresses); + seedHostsResolver.start(); + } + @After public void stopResolver() throws IOException { seedHostsResolver.stop(); @@ -176,6 +187,7 @@ public class SeedHostsResolverTests extends ESTestCase { Collections.emptySet()); closeables.push(transportService); final List transportAddresses = SeedHostsResolver.resolveHostsLists( + new CancellableThreads(), executorService, logger, IntStream.range(9300, 9310) @@ -228,6 +240,7 @@ public class SeedHostsResolverTests extends ESTestCase { closeables.push(transportService); final List transportAddresses = SeedHostsResolver.resolveHostsLists( + new CancellableThreads(), executorService, logger, Arrays.asList(hostname), @@ -286,6 +299,7 @@ public class SeedHostsResolverTests extends ESTestCase { final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(3, 5)); try { final List transportAddresses = SeedHostsResolver.resolveHostsLists( + new CancellableThreads(), executorService, logger, Arrays.asList("hostname1", "hostname2"), @@ -303,7 +317,63 @@ public class SeedHostsResolverTests extends ESTestCase { } } - public void testInvalidHosts() { + public void testCancellationOnClose() throws InterruptedException { + final NetworkService networkService = new NetworkService(Collections.emptyList()); + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch conditionLatch = new CountDownLatch(1); + final Transport transport = new MockNioTransport( + Settings.EMPTY, + Version.CURRENT, + threadPool, + networkService, + PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), + new NoneCircuitBreakerService()) { + + @Override + public BoundTransportAddress boundAddress() { + return new BoundTransportAddress( + new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9500)}, + new TransportAddress(InetAddress.getLoopbackAddress(), 9500) + ); + } + + @Override + public TransportAddress[] addressesFromString(String address) throws UnknownHostException { + if ("hostname1".equals(address)) { + return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)}; + } else if ("hostname2".equals(address)) { + try { + conditionLatch.countDown(); + latch.await(); + throw new AssertionError("should never be called"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + throw new UnknownHostException(address); + } + } + + }; + closeables.push(transport); + + final TransportService transportService = + new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, + Collections.emptySet()); + closeables.push(transportService); + recreateSeedHostsResolver(transportService, + Settings.builder().put(SeedHostsResolver.DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.getKey(), "10m").build()); + + final PlainActionFuture> fut = new PlainActionFuture<>(); + threadPool.generic().execute((() -> fut.onResponse(seedHostsResolver.resolveHosts(Arrays.asList("hostname1", "hostname2"))))); + + conditionLatch.await(); + seedHostsResolver.stop(); + assertThat(FutureUtils.get(fut, 10, TimeUnit.SECONDS), hasSize(0)); + } + + public void testInvalidHosts() throws IllegalAccessException { final Logger logger = mock(Logger.class); final Transport transport = new MockNioTransport( Settings.EMPTY, @@ -328,6 +398,7 @@ public class SeedHostsResolverTests extends ESTestCase { Collections.emptySet()); closeables.push(transportService); final List transportAddresses = SeedHostsResolver.resolveHostsLists( + new CancellableThreads(), executorService, logger, Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"),