Stop SeedHostsResolver on shutdown (#42844)

Fixes an issue where tests would sometimes hang for 5 seconds when restarting a node. The reason
is that the SeedHostsResolver is blockingly waiting on a result for the full 5 seconds when the
corresponding threadpool is shut down.
This commit is contained in:
Yannick Welsch 2019-06-12 18:49:59 +02:00
parent 9d2adfb41e
commit 8711a092bf
4 changed files with 95 additions and 11 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.discovery.PeerFinder.ConfiguredHostsResolver;
@ -73,6 +74,7 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
private final TimeValue resolveTimeout;
private final String nodeName;
private final int concurrentConnects;
private final CancellableThreads cancellableThreads = new CancellableThreads();
public SeedHostsResolver(String nodeName, Settings settings, TransportService transportService,
SeedHostsProvider seedProvider) {
@ -121,6 +123,7 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
* @return a list of resolved transport addresses
*/
public static List<TransportAddress> resolveHostsLists(
final CancellableThreads cancellableThreads,
final ExecutorService executorService,
final Logger logger,
final List<String> hosts,
@ -140,11 +143,11 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
.stream()
.map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn))
.collect(Collectors.toList());
final List<Future<TransportAddress[]>> futures;
final SetOnce<List<Future<TransportAddress[]>>> futures = new SetOnce<>();
try {
futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
cancellableThreads.execute(() ->
futures.set(executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS)));
} catch (CancellableThreads.ExecutionCancelledException e) {
return Collections.emptyList();
}
final List<TransportAddress> transportAddresses = new ArrayList<>();
@ -154,10 +157,10 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
// ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
// hostname with the corresponding task by iterating together
final Iterator<String> it = hosts.iterator();
for (final Future<TransportAddress[]> future : futures) {
for (final Future<TransportAddress[]> future : futures.get()) {
assert future.isDone();
final String hostname = it.next();
if (!future.isCancelled()) {
assert future.isDone();
try {
final TransportAddress[] addresses = future.get();
logger.trace("resolved host [{}] to {}", hostname, addresses);
@ -193,6 +196,7 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
@Override
protected void doStop() {
cancellableThreads.cancel("stopping SeedHostsResolver");
ThreadPool.terminate(executorService.get(), 10, TimeUnit.SECONDS);
}
@ -223,7 +227,7 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
List<TransportAddress> providedAddresses
= hostsProvider.getSeedAddresses(hosts ->
resolveHostsLists(executorService.get(), logger, hosts, transportService, resolveTimeout));
resolveHostsLists(cancellableThreads, executorService.get(), logger, hosts, transportService, resolveTimeout));
consumer.accept(providedAddresses);
}
@ -240,4 +244,8 @@ public class SeedHostsResolver extends AbstractLifecycleComponent implements Con
});
}
}
List<TransportAddress> resolveHosts(List<String> hosts) {
return resolveHostsLists(cancellableThreads, executorService.get(), logger, hosts, transportService, resolveTimeout);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
@ -144,7 +145,8 @@ public class UnicastZenPing implements ZenPing {
}
private SeedHostsProvider.HostsResolver createHostsResolver() {
return hosts -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts, transportService, resolveTimeout);
return hosts -> SeedHostsResolver.resolveHostsLists(new CancellableThreads(), unicastZenPingExecutorService, logger, hosts,
transportService, resolveTimeout);
}
@Override

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
@ -116,7 +117,8 @@ public class FileBasedSeedHostsProviderTests extends ESTestCase {
public void testUnicastHostsDoesNotExist() {
final FileBasedSeedHostsProvider provider = new FileBasedSeedHostsProvider(createTempDir().toAbsolutePath());
final List<TransportAddress> addresses = provider.getSeedAddresses(hosts ->
SeedHostsResolver.resolveHostsLists(executorService, logger, hosts, transportService, TimeValue.timeValueSeconds(10)));
SeedHostsResolver.resolveHostsLists(new CancellableThreads(), executorService, logger, hosts, transportService,
TimeValue.timeValueSeconds(10)));
assertEquals(0, addresses.size());
}
@ -145,6 +147,7 @@ public class FileBasedSeedHostsProviderTests extends ESTestCase {
}
return new FileBasedSeedHostsProvider(configPath).getSeedAddresses(hosts ->
SeedHostsResolver.resolveHostsLists(executorService, logger, hosts, transportService, TimeValue.timeValueSeconds(10)));
SeedHostsResolver.resolveHostsLists(new CancellableThreads(), executorService, logger, hosts, transportService,
TimeValue.timeValueSeconds(10)));
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.discovery;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
@ -28,8 +29,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
@ -100,6 +103,14 @@ public class SeedHostsResolverTests extends ESTestCase {
closeables = new Stack<>();
}
private void recreateSeedHostsResolver(TransportService transportService, Settings settings) {
if (seedHostsResolver != null) {
seedHostsResolver.stop();
}
seedHostsResolver = new SeedHostsResolver("test_node", settings, transportService, hostsResolver -> transportAddresses);
seedHostsResolver.start();
}
@After
public void stopResolver() throws IOException {
seedHostsResolver.stop();
@ -176,6 +187,7 @@ public class SeedHostsResolverTests extends ESTestCase {
Collections.emptySet());
closeables.push(transportService);
final List<TransportAddress> transportAddresses = SeedHostsResolver.resolveHostsLists(
new CancellableThreads(),
executorService,
logger,
IntStream.range(9300, 9310)
@ -228,6 +240,7 @@ public class SeedHostsResolverTests extends ESTestCase {
closeables.push(transportService);
final List<TransportAddress> transportAddresses = SeedHostsResolver.resolveHostsLists(
new CancellableThreads(),
executorService,
logger,
Arrays.asList(hostname),
@ -286,6 +299,7 @@ public class SeedHostsResolverTests extends ESTestCase {
final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(3, 5));
try {
final List<TransportAddress> transportAddresses = SeedHostsResolver.resolveHostsLists(
new CancellableThreads(),
executorService,
logger,
Arrays.asList("hostname1", "hostname2"),
@ -303,7 +317,63 @@ public class SeedHostsResolverTests extends ESTestCase {
}
}
public void testInvalidHosts() {
public void testCancellationOnClose() throws InterruptedException {
final NetworkService networkService = new NetworkService(Collections.emptyList());
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch conditionLatch = new CountDownLatch(1);
final Transport transport = new MockNioTransport(
Settings.EMPTY,
Version.CURRENT,
threadPool,
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService()) {
@Override
public BoundTransportAddress boundAddress() {
return new BoundTransportAddress(
new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9500)},
new TransportAddress(InetAddress.getLoopbackAddress(), 9500)
);
}
@Override
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
if ("hostname1".equals(address)) {
return new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)};
} else if ("hostname2".equals(address)) {
try {
conditionLatch.countDown();
latch.await();
throw new AssertionError("should never be called");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
throw new UnknownHostException(address);
}
}
};
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
Collections.emptySet());
closeables.push(transportService);
recreateSeedHostsResolver(transportService,
Settings.builder().put(SeedHostsResolver.DISCOVERY_SEED_RESOLVER_TIMEOUT_SETTING.getKey(), "10m").build());
final PlainActionFuture<List<TransportAddress>> fut = new PlainActionFuture<>();
threadPool.generic().execute((() -> fut.onResponse(seedHostsResolver.resolveHosts(Arrays.asList("hostname1", "hostname2")))));
conditionLatch.await();
seedHostsResolver.stop();
assertThat(FutureUtils.get(fut, 10, TimeUnit.SECONDS), hasSize(0));
}
public void testInvalidHosts() throws IllegalAccessException {
final Logger logger = mock(Logger.class);
final Transport transport = new MockNioTransport(
Settings.EMPTY,
@ -328,6 +398,7 @@ public class SeedHostsResolverTests extends ESTestCase {
Collections.emptySet());
closeables.push(transportService);
final List<TransportAddress> transportAddresses = SeedHostsResolver.resolveHostsLists(
new CancellableThreads(),
executorService,
logger,
Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"),