From f8769e0f4b917d9fda8ff7a9fddb4d755d246a1e Mon Sep 17 00:00:00 2001 From: sunlisheng Date: Thu, 28 Jan 2021 10:10:39 +0800 Subject: [PATCH] HDFS-15661. The DeadNodeDetector should not be shared by different DFSClients. Contributed by Jinglun. --- .../org/apache/hadoop/hdfs/ClientContext.java | 45 +++++++++++++------ .../org/apache/hadoop/hdfs/DFSClient.java | 9 +++- .../apache/hadoop/hdfs/DeadNodeDetector.java | 2 +- .../hadoop/hdfs/TestDeadNodeDetection.java | 37 +++++++++++++++ 4 files changed, 77 insertions(+), 16 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index b34420da5ce..47e985b68e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -40,10 +40,10 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; -import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,8 +119,6 @@ public class ClientContext { private NodeBase clientNode; private boolean topologyResolutionEnabled; - private Daemon deadNodeDetectorThr = null; - /** * The switch to DeadNodeDetector. */ @@ -130,12 +128,18 @@ public class ClientContext { * Detect the dead datanodes in advance, and share this information among all * the DFSInputStreams in the same client. */ - private DeadNodeDetector deadNodeDetector = null; + private volatile DeadNodeDetector deadNodeDetector = null; + + /** + * Count the reference of ClientContext. + */ + private int counter = 0; /** * ShortCircuitCache array size. */ private final int clientShortCircuitNum; + private Configuration configuration; private ClientContext(String name, DfsClientConf conf, Configuration config) { @@ -149,6 +153,7 @@ public class ClientContext { this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf); } + this.configuration = config; this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), scConf.getSocketCacheExpiry()); this.keyProviderCache = new KeyProviderCache( @@ -159,11 +164,6 @@ public class ClientContext { this.byteArrayManager = ByteArrayManager.newInstance( conf.getWriteByteArrayManagerConf()); this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled(); - if (deadNodeDetectionEnabled && deadNodeDetector == null) { - deadNodeDetector = new DeadNodeDetector(name, config); - deadNodeDetectorThr = new Daemon(deadNodeDetector); - deadNodeDetectorThr.start(); - } initTopologyResolution(config); } @@ -201,6 +201,7 @@ public class ClientContext { context.printConfWarningIfNeeded(conf); } } + context.reference(); return context; } @@ -301,17 +302,33 @@ public class ClientContext { } /** - * Close dead node detector thread. + * Increment the counter. Start the dead node detector thread if there is no + * reference. */ - public void stopDeadNodeDetectorThread() { - if (deadNodeDetectorThr != null) { - deadNodeDetectorThr.interrupt(); + synchronized void reference() { + counter++; + if (deadNodeDetectionEnabled && deadNodeDetector == null) { + deadNodeDetector = new DeadNodeDetector(name, configuration); + deadNodeDetector.start(); + } + } + + /** + * Decrement the counter. Close the dead node detector thread if there is no + * reference. + */ + synchronized void unreference() { + Preconditions.checkState(counter > 0); + counter--; + if (counter == 0 && deadNodeDetectionEnabled && deadNodeDetector != null) { + deadNodeDetector.interrupt(); try { - deadNodeDetectorThr.join(); + deadNodeDetector.join(); } catch (InterruptedException e) { LOG.warn("Encountered exception while waiting to join on dead " + "node detector thread.", e); } + deadNodeDetector = null; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 861b6a9c53a..fc3a16db519 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -648,7 +648,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, clientRunning = false; // close dead node detector thread if (!disabledStopDeadNodeDetectorThreadForTest) { - clientContext.stopDeadNodeDetectorThread(); + clientContext.unreference(); } // close connections to the namenode @@ -3441,4 +3441,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private boolean isDeadNodeDetectionEnabled() { return clientContext.isDeadNodeDetectionEnabled(); } + + /** + * Obtain DeadNodeDetector of the current client. + */ + public DeadNodeDetector getDeadNodeDetector() { + return clientContext.getDeadNodeDetector(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java index fd8263f88ed..112bc0407f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java @@ -62,7 +62,7 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCK * Detect the dead nodes in advance, and share this information among all the * DFSInputStreams in the same client. */ -public class DeadNodeDetector implements Runnable { +public class DeadNodeDetector extends Daemon { public static final Logger LOG = LoggerFactory.getLogger(DeadNodeDetector.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java index 09e670211a5..9c52fcd8d1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import java.net.URI; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -43,6 +44,11 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * Tests for dead node detection in DFSClient. @@ -320,6 +326,37 @@ public class TestDeadNodeDetection { } } + @Test + public void testCloseDeadNodeDetector() throws Exception { + DistributedFileSystem dfs0 = (DistributedFileSystem) FileSystem + .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf); + DistributedFileSystem dfs1 = (DistributedFileSystem) FileSystem + .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf); + // The DeadNodeDetector is shared by different DFSClients. + DeadNodeDetector detector = dfs0.getClient().getDeadNodeDetector(); + assertNotNull(detector); + assertSame(detector, dfs1.getClient().getDeadNodeDetector()); + // Close one client. The dead node detector should be alive. + dfs0.close(); + detector = dfs0.getClient().getDeadNodeDetector(); + assertNotNull(detector); + assertSame(detector, dfs1.getClient().getDeadNodeDetector()); + assertTrue(detector.isAlive()); + // Close all clients. The dead node detector should be closed. + dfs1.close(); + detector = dfs0.getClient().getDeadNodeDetector(); + assertNull(detector); + assertSame(detector, dfs1.getClient().getDeadNodeDetector()); + // Create a new client. The dead node detector should be alive. + dfs1 = (DistributedFileSystem) FileSystem + .newInstance(new URI("hdfs://127.0.0.1:2001/"), conf); + DeadNodeDetector newDetector = dfs0.getClient().getDeadNodeDetector(); + assertNotNull(newDetector); + assertTrue(newDetector.isAlive()); + assertNotSame(detector, newDetector); + dfs1.close(); + } + private void createFile(FileSystem fs, Path filePath) throws IOException { FSDataOutputStream out = null; try {