Do not use CancellableThreads for Zen1 (#44430)

Zen 1 stops pinging threads in ZenDiscovery by calling Thread.interrupt(). This is incompatible with
the CancellableThreads that only allow threads to be interrupted through cancellation. The use of
CancellableThreads was introduced in #42844 and added to UnicastZenPing as part of the
backport, as both Zen1 and Zen2 share the same SeedHostsResolver implementation. This commit
effectively undoes the change in the backport while still allowing to share same implementation.

Closes #44425
This commit is contained in:
Yannick Welsch 2019-07-17 17:32:47 +02:00 committed by GitHub
parent d98b3e4760
commit ddd740162e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 11 additions and 5 deletions

View File

@ -101,8 +101,6 @@ public class UnicastZenPing implements ZenPing {
private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger(); private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final Map<Integer, PingingRound> activePingingRounds = newConcurrentMap(); private final Map<Integer, PingingRound> activePingingRounds = newConcurrentMap();
// a list of temporal responses a node will return for a request (holds responses from other nodes) // a list of temporal responses a node will return for a request (holds responses from other nodes)
@ -149,13 +147,21 @@ public class UnicastZenPing implements ZenPing {
} }
private SeedHostsProvider.HostsResolver createHostsResolver() { private SeedHostsProvider.HostsResolver createHostsResolver() {
return hosts -> SeedHostsResolver.resolveHostsLists(cancellableThreads, unicastZenPingExecutorService, logger, hosts, return hosts -> SeedHostsResolver.resolveHostsLists(
transportService, resolveTimeout); new CancellableThreads() {
public void execute(Interruptible interruptible) {
try {
interruptible.run();
} catch (InterruptedException e) {
throw new CancellableThreads.ExecutionCancelledException("interrupted by " + e);
}
}
},
unicastZenPingExecutorService, logger, hosts, transportService, resolveTimeout);
} }
@Override @Override
public void close() { public void close() {
cancellableThreads.cancel("stopping UnicastZenPing");
ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS); ThreadPool.terminate(unicastZenPingExecutorService, 10, TimeUnit.SECONDS);
Releasables.close(activePingingRounds.values()); Releasables.close(activePingingRounds.values());
closed = true; closed = true;