HDFS-15809. DeadNodeDetector does not remove live nodes from dead node set. Contributed by Jinglun.

(cherry picked from commit 7025f39944)
This commit is contained in:
sunlisheng 2021-03-15 11:34:13 +08:00 committed by Wei-Chiu Chuang
parent 94766fdb13
commit a8c0083b1b
4 changed files with 109 additions and 81 deletions

View File

@ -29,9 +29,9 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue; import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -40,8 +40,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT;
@ -54,9 +52,9 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT;
/** /**
* Detect the dead nodes in advance, and share this information among all the * Detect the dead nodes in advance, and share this information among all the
@ -74,7 +72,7 @@ public class DeadNodeDetector extends Daemon {
/** /**
* Waiting time when DeadNodeDetector's state is idle. * Waiting time when DeadNodeDetector's state is idle.
*/ */
private static final long IDLE_SLEEP_MS = 10000; private final long idleSleepMs;
/** /**
* Client context name. * Client context name.
@ -113,16 +111,6 @@ public class DeadNodeDetector extends Daemon {
*/ */
private long suspectNodeDetectInterval = 0; private long suspectNodeDetectInterval = 0;
/**
* The max queue size of probing dead node.
*/
private int maxDeadNodesProbeQueueLen = 0;
/**
* The max queue size of probing suspect node.
*/
private int maxSuspectNodesProbeQueueLen;
/** /**
* Connection timeout for probing dead node in milliseconds. * Connection timeout for probing dead node in milliseconds.
*/ */
@ -131,12 +119,12 @@ public class DeadNodeDetector extends Daemon {
/** /**
* The dead node probe queue. * The dead node probe queue.
*/ */
private Queue<DatanodeInfo> deadNodesProbeQueue; private UniqueQueue<DatanodeInfo> deadNodesProbeQueue;
/** /**
* The suspect node probe queue. * The suspect node probe queue.
*/ */
private Queue<DatanodeInfo> suspectNodesProbeQueue; private UniqueQueue<DatanodeInfo> suspectNodesProbeQueue;
/** /**
* The thread pool of probing dead node. * The thread pool of probing dead node.
@ -181,6 +169,32 @@ public class DeadNodeDetector extends Daemon {
INIT, CHECK_DEAD, IDLE, ERROR INIT, CHECK_DEAD, IDLE, ERROR
} }
/**
* The thread safe unique queue.
*/
static class UniqueQueue<T> {
private Deque<T> queue = new LinkedList<>();
private Set<T> set = new HashSet<>();
synchronized boolean offer(T dn) {
if (set.add(dn)) {
queue.addLast(dn);
return true;
}
return false;
}
synchronized T poll() {
T dn = queue.pollFirst();
set.remove(dn);
return dn;
}
synchronized int size() {
return set.size();
}
}
/** /**
* Disabled start probe suspect/dead thread for the testing. * Disabled start probe suspect/dead thread for the testing.
*/ */
@ -203,20 +217,14 @@ public class DeadNodeDetector extends Daemon {
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT); DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_DEFAULT);
socketTimeout = socketTimeout =
conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT); conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
maxDeadNodesProbeQueueLen =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT);
maxSuspectNodesProbeQueueLen =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT);
probeConnectionTimeoutMs = conf.getLong( probeConnectionTimeoutMs = conf.getLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY, DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT); DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
this.deadNodesProbeQueue = new UniqueQueue<>();
this.suspectNodesProbeQueue = new UniqueQueue<>();
this.deadNodesProbeQueue = idleSleepMs = conf.getLong(DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY,
new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen); DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT);
this.suspectNodesProbeQueue =
new ArrayBlockingQueue<DatanodeInfo>(maxSuspectNodesProbeQueueLen);
int deadNodeDetectDeadThreads = int deadNodeDetectDeadThreads =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY, conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY,
@ -447,8 +455,7 @@ public class DeadNodeDetector extends Daemon {
for (DatanodeInfo datanodeInfo : datanodeInfos) { for (DatanodeInfo datanodeInfo : datanodeInfos) {
if (!deadNodesProbeQueue.offer(datanodeInfo)) { if (!deadNodesProbeQueue.offer(datanodeInfo)) {
LOG.debug("Skip to add dead node {} to check " + LOG.debug("Skip to add dead node {} to check " +
"since the probe queue is full.", datanodeInfo); "since the node is already in the probe queue.", datanodeInfo);
break;
} else { } else {
LOG.debug("Add dead node to check: {}.", datanodeInfo); LOG.debug("Add dead node to check: {}.", datanodeInfo);
} }
@ -458,7 +465,7 @@ public class DeadNodeDetector extends Daemon {
private void idle() { private void idle() {
try { try {
Thread.sleep(IDLE_SLEEP_MS); Thread.sleep(idleSleepMs);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.debug("Got interrupted while DeadNodeDetector is idle.", e); LOG.debug("Got interrupted while DeadNodeDetector is idle.", e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -483,14 +490,24 @@ public class DeadNodeDetector extends Daemon {
deadNodes.remove(datanodeInfo.getDatanodeUuid()); deadNodes.remove(datanodeInfo.getDatanodeUuid());
} }
public Queue<DatanodeInfo> getDeadNodesProbeQueue() { public UniqueQueue<DatanodeInfo> getDeadNodesProbeQueue() {
return deadNodesProbeQueue; return deadNodesProbeQueue;
} }
public Queue<DatanodeInfo> getSuspectNodesProbeQueue() { public UniqueQueue<DatanodeInfo> getSuspectNodesProbeQueue() {
return suspectNodesProbeQueue; return suspectNodesProbeQueue;
} }
@VisibleForTesting
void setSuspectQueue(UniqueQueue<DatanodeInfo> queue) {
this.suspectNodesProbeQueue = queue;
}
@VisibleForTesting
void setDeadQueue(UniqueQueue<DatanodeInfo> queue) {
this.deadNodesProbeQueue = queue;
}
/** /**
* Add datanode to suspectNodes and suspectAndDeadNodes. * Add datanode to suspectNodes and suspectAndDeadNodes.
*/ */

View File

@ -161,13 +161,9 @@ public interface HdfsClientConfigKeys {
"dfs.client.deadnode.detection.enabled"; "dfs.client.deadnode.detection.enabled";
boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false; boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false;
String DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY = String DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY =
"dfs.client.deadnode.detection.deadnode.queue.max"; "dfs.client.deadnode.detection.idle.sleep.ms";
int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100; long DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_DEFAULT = 10000;
String DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY =
"dfs.client.deadnode.detection.suspectnode.queue.max";
int DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_DEFAULT = 1000;
String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY = String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY =
"dfs.client.deadnode.detection.probe.connection.timeout.ms"; "dfs.client.deadnode.detection.probe.connection.timeout.ms";

View File

@ -3119,22 +3119,6 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.deadnode.detection.deadnode.queue.max</name>
<value>100</value>
<description>
The max queue size of probing dead node.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.suspectnode.queue.max</name>
<value>1000</value>
<description>
The max queue size of probing suspect node.
</description>
</property>
<property> <property>
<name>dfs.client.deadnode.detection.probe.deadnode.threads</name> <name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
<value>10</value> <value>10</value>
@ -3143,6 +3127,14 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.deadnode.detection.idle.sleep.ms</name>
<value>10000</value>
<description>
The sleep time of DeadNodeDetector per iteration.
</description>
</property>
<property> <property>
<name>dfs.client.deadnode.detection.probe.suspectnode.threads</name> <name>dfs.client.deadnode.detection.probe.suspectnode.threads</name>
<value>10</value> <value>10</value>

View File

@ -30,19 +30,20 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNotSame;
@ -73,6 +74,7 @@ public class TestDeadNodeDetection {
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY, DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
1000); 1000);
conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0); conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0);
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY, 100);
} }
@After @After
@ -247,42 +249,63 @@ public class TestDeadNodeDetection {
} }
@Test @Test
public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception { public void testDeadNodeDetectionDeadNodeProbe() throws Exception {
conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1); FileSystem fs = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); FSDataInputStream in = null;
cluster.waitActive(); Path filePath = new Path("/" + GenericTestUtils.getMethodName());
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/testDeadNodeDetectionMaxDeadNodesProbeQueue");
createFile(fs, filePath);
// Remove three DNs,
cluster.stopDataNode(0);
cluster.stopDataNode(0);
cluster.stopDataNode(0);
FSDataInputStream in = fs.open(filePath);
DFSInputStream din = (DFSInputStream) in.getWrappedStream();
DFSClient dfsClient = din.getDFSClient();
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
fs = cluster.getFileSystem();
createFile(fs, filePath);
// Remove three DNs,
cluster.stopDataNode(0);
cluster.stopDataNode(0);
cluster.stopDataNode(0);
in = fs.open(filePath);
DFSInputStream din = (DFSInputStream) in.getWrappedStream();
DFSClient dfsClient = din.getDFSClient();
DeadNodeDetector deadNodeDetector =
dfsClient.getClientContext().getDeadNodeDetector();
// Spy suspect queue and dead queue.
DeadNodeDetector.UniqueQueue<DatanodeInfo> queue =
deadNodeDetector.getSuspectNodesProbeQueue();
DeadNodeDetector.UniqueQueue<DatanodeInfo> suspectSpy =
Mockito.spy(queue);
deadNodeDetector.setSuspectQueue(suspectSpy);
queue = deadNodeDetector.getDeadNodesProbeQueue();
DeadNodeDetector.UniqueQueue<DatanodeInfo> deadSpy = Mockito.spy(queue);
deadNodeDetector.setDeadQueue(deadSpy);
// Trigger dead node detection.
try { try {
in.read(); in.read();
} catch (BlockMissingException e) { } catch (BlockMissingException e) {
} }
Thread.sleep(1500); Thread.sleep(1500);
Assert.assertTrue((dfsClient.getClientContext().getDeadNodeDetector() Collection<DatanodeInfo> deadNodes =
.getDeadNodesProbeQueue().size() dfsClient.getDeadNodeDetector().clearAndGetDetectedDeadNodes();
+ dfsClient.getDeadNodes(din).size()) <= 4); assertEquals(3, deadNodes.size());
for (DatanodeInfo dead : deadNodes) {
// Each node is suspected once then marked as dead.
Mockito.verify(suspectSpy, Mockito.times(1)).offer(dead);
// All the dead nodes should be scheduled and probed at least once.
Mockito.verify(deadSpy, Mockito.atLeastOnce()).offer(dead);
Mockito.verify(deadSpy, Mockito.atLeastOnce()).poll();
}
} finally { } finally {
in.close(); if (in != null) {
in.close();
}
deleteFile(fs, filePath); deleteFile(fs, filePath);
} }
} }
@Test @Test
public void testDeadNodeDetectionSuspectNode() throws Exception { public void testDeadNodeDetectionSuspectNode() throws Exception {
conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1);
DeadNodeDetector.setDisabledProbeThreadForTest(true); DeadNodeDetector.setDisabledProbeThreadForTest(true);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();