HDFS-15806. DeadNodeDetector should close all the threads when it is closed. Contributed by Jinglun.
This commit is contained in:
parent
2ce5752fa8
commit
ff84a57483
|
@ -321,13 +321,7 @@ public class ClientContext {
|
|||
Preconditions.checkState(counter > 0);
|
||||
counter--;
|
||||
if (counter == 0 && deadNodeDetectionEnabled && deadNodeDetector != null) {
|
||||
deadNodeDetector.interrupt();
|
||||
try {
|
||||
deadNodeDetector.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Encountered exception while waiting to join on dead " +
|
||||
"node detector thread.", e);
|
||||
}
|
||||
deadNodeDetector.shutdown();
|
||||
deadNodeDetector = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -271,6 +271,37 @@ public class DeadNodeDetector extends Daemon {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown all the threads.
|
||||
*/
|
||||
public void shutdown() {
|
||||
threadShutDown(this);
|
||||
threadShutDown(probeDeadNodesSchedulerThr);
|
||||
threadShutDown(probeSuspectNodesSchedulerThr);
|
||||
probeDeadNodesThreadPool.shutdown();
|
||||
probeSuspectNodesThreadPool.shutdown();
|
||||
rpcThreadPool.shutdown();
|
||||
}
|
||||
|
||||
private static void threadShutDown(Thread thread) {
|
||||
if (thread != null && thread.isAlive()) {
|
||||
thread.interrupt();
|
||||
try {
|
||||
thread.join();
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean isThreadsShutdown() {
|
||||
return !this.isAlive() && !probeDeadNodesSchedulerThr.isAlive()
|
||||
&& !probeSuspectNodesSchedulerThr.isAlive()
|
||||
&& probeDeadNodesThreadPool.isShutdown()
|
||||
&& probeSuspectNodesThreadPool.isShutdown()
|
||||
&& rpcThreadPool.isShutdown();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static void setDisabledProbeThreadForTest(
|
||||
boolean disabledProbeThreadForTest) {
|
||||
|
|
|
@ -357,6 +357,18 @@ public class TestDeadNodeDetection {
|
|||
dfs1.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeadNodeDetectorThreadsShutdown() throws Exception {
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem
|
||||
.newInstance(new URI("hdfs://127.0.0.1:2001/"), conf);
|
||||
DeadNodeDetector detector = dfs.getClient().getDeadNodeDetector();
|
||||
assertNotNull(detector);
|
||||
dfs.close();
|
||||
assertTrue(detector.isThreadsShutdown());
|
||||
detector = dfs.getClient().getDeadNodeDetector();
|
||||
assertNull(detector);
|
||||
}
|
||||
|
||||
private void createFile(FileSystem fs, Path filePath) throws IOException {
|
||||
FSDataOutputStream out = null;
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue