HDFS-14651. DeadNodeDetector checks dead node periodically. Contributed by Lisheng Sun.

This commit is contained in:
Yiqun Lin 2019-11-22 10:53:55 +08:00
parent b89fd4dfe9
commit 9b6906fe91
5 changed files with 504 additions and 22 deletions

View File

@ -150,7 +150,7 @@ public class ClientContext {
conf.getWriteByteArrayManagerConf()); conf.getWriteByteArrayManagerConf());
this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled(); this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
if (deadNodeDetectionEnabled && deadNodeDetector == null) { if (deadNodeDetectionEnabled && deadNodeDetector == null) {
deadNodeDetector = new DeadNodeDetector(name); deadNodeDetector = new DeadNodeDetector(name, config);
deadNodeDetectorThr = new Daemon(deadNodeDetector); deadNodeDetectorThr = new Daemon(deadNodeDetector);
deadNodeDetectorThr.start(); deadNodeDetectorThr.start();
} }

View File

@ -17,13 +17,40 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
/** /**
* Detect the dead nodes in advance, and share this information among all the * Detect the dead nodes in advance, and share this information among all the
@ -48,10 +75,12 @@ public class DeadNodeDetector implements Runnable {
*/ */
private String name; private String name;
private Configuration conf;
/** /**
* Dead nodes shared by all the DFSInputStreams of the client. * Dead nodes shared by all the DFSInputStreams of the client.
*/ */
private final ConcurrentHashMap<String, DatanodeInfo> deadNodes; private final Map<String, DatanodeInfo> deadNodes;
/** /**
* Record dead nodes by one DFSInputStream. When dead node is not used by one * Record dead nodes by one DFSInputStream. When dead node is not used by one
@ -59,9 +88,66 @@ public class DeadNodeDetector implements Runnable {
* DFSInputStream does not include any dead node, remove DFSInputStream from * DFSInputStream does not include any dead node, remove DFSInputStream from
* dfsInputStreamNodes. * dfsInputStreamNodes.
*/ */
private final ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>> private final Map<DFSInputStream, HashSet<DatanodeInfo>>
dfsInputStreamNodes; dfsInputStreamNodes;
/**
* Datanodes that is being probed.
*/
private Map<String, DatanodeInfo> probeInProg =
new ConcurrentHashMap<String, DatanodeInfo>();
/**
* The last time when detect dead node.
*/
private long lastDetectDeadTS = 0;
/**
* Interval time in milliseconds for probing dead node behavior.
*/
private long deadNodeDetectInterval = 0;
/**
* The max queue size of probing dead node.
*/
private int maxDeadNodesProbeQueueLen = 0;
/**
* Connection timeout for probing dead node in milliseconds.
*/
private long probeConnectionTimeoutMs;
/**
* The dead node probe queue.
*/
private Queue<DatanodeInfo> deadNodesProbeQueue;
/**
* The thread pool of probing dead node.
*/
private ExecutorService probeDeadNodesThreadPool;
/**
* The scheduler thread of probing dead node.
*/
private Thread probeDeadNodesSchedulerThr;
/**
* The thread pool of probing datanodes' rpc request. Sometimes the data node
* can hang and not respond to the client in a short time. And these node will
* filled with probe thread pool and block other normal node probing.
*/
private ExecutorService rpcThreadPool;
private int socketTimeout;
/**
* The type of probe.
*/
private enum ProbeType {
CHECK_DEAD
}
/** /**
* The state of DeadNodeDetector. * The state of DeadNodeDetector.
*/ */
@ -71,12 +157,40 @@ public class DeadNodeDetector implements Runnable {
private State state; private State state;
public DeadNodeDetector(String name) { public DeadNodeDetector(String name, Configuration conf) {
this.conf = new Configuration(conf);
this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>(); this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>();
this.dfsInputStreamNodes = this.dfsInputStreamNodes =
new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>(); new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>();
this.name = name; this.name = name;
deadNodeDetectInterval = conf.getLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT);
socketTimeout =
conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsConstants.READ_TIMEOUT);
maxDeadNodesProbeQueueLen =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT);
probeConnectionTimeoutMs = conf.getLong(
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
this.deadNodesProbeQueue =
new ArrayBlockingQueue<DatanodeInfo>(maxDeadNodesProbeQueueLen);
int deadNodeDetectDeadThreads =
conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT);
int rpcThreads = conf.getInt(DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT);
probeDeadNodesThreadPool = Executors.newFixedThreadPool(
deadNodeDetectDeadThreads, new Daemon.DaemonFactory());
rpcThreadPool =
Executors.newFixedThreadPool(rpcThreads, new Daemon.DaemonFactory());
startProbeScheduler();
LOG.info("Start dead node detector for DFSClient {}.", this.name); LOG.info("Start dead node detector for DFSClient {}.", this.name);
state = State.INIT; state = State.INIT;
} }
@ -91,6 +205,9 @@ public class DeadNodeDetector implements Runnable {
case INIT: case INIT:
init(); init();
break; break;
case CHECK_DEAD:
checkDeadNodes();
break;
case IDLE: case IDLE:
idle(); idle();
break; break;
@ -106,6 +223,134 @@ public class DeadNodeDetector implements Runnable {
} }
} }
/**
* Start probe dead node thread.
*/
private void startProbeScheduler() {
probeDeadNodesSchedulerThr =
new Thread(new ProbeScheduler(this, ProbeType.CHECK_DEAD));
probeDeadNodesSchedulerThr.setDaemon(true);
probeDeadNodesSchedulerThr.start();
}
/**
* Prode datanode by probe byte.
*/
private void scheduleProbe(ProbeType type) {
LOG.debug("Schedule probe datanode for probe type: {}.", type);
DatanodeInfo datanodeInfo = null;
if (type == ProbeType.CHECK_DEAD) {
while ((datanodeInfo = deadNodesProbeQueue.poll()) != null) {
if (probeInProg.containsKey(datanodeInfo.getDatanodeUuid())) {
LOG.debug("The datanode {} is already contained in probe queue, " +
"skip to add it.", datanodeInfo);
continue;
}
probeInProg.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
Probe probe = new Probe(this, datanodeInfo, ProbeType.CHECK_DEAD);
probeDeadNodesThreadPool.execute(probe);
}
}
}
/**
* Request the data node through rpc, and determine the data node status based
* on the returned result.
*/
class Probe implements Runnable {
private DeadNodeDetector deadNodeDetector;
private DatanodeInfo datanodeInfo;
private ProbeType type;
Probe(DeadNodeDetector deadNodeDetector, DatanodeInfo datanodeInfo,
ProbeType type) {
this.deadNodeDetector = deadNodeDetector;
this.datanodeInfo = datanodeInfo;
this.type = type;
}
public DatanodeInfo getDatanodeInfo() {
return datanodeInfo;
}
public ProbeType getType() {
return type;
}
@Override
public void run() {
LOG.debug("Check node: {}, type: {}.", datanodeInfo, type);
try {
final ClientDatanodeProtocol proxy =
DFSUtilClient.createClientDatanodeProtocolProxy(datanodeInfo,
deadNodeDetector.conf, socketTimeout, true);
Future<DatanodeLocalInfo> future = rpcThreadPool.submit(new Callable() {
@Override
public DatanodeLocalInfo call() throws Exception {
return proxy.getDatanodeInfo();
}
});
try {
future.get(probeConnectionTimeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOG.error("Probe failed, datanode: {}, type: {}.", datanodeInfo, type,
e);
deadNodeDetector.probeCallBack(this, false);
return;
} finally {
future.cancel(true);
}
deadNodeDetector.probeCallBack(this, true);
return;
} catch (Exception e) {
LOG.error("Probe failed, datanode: {}, type: {}.", datanodeInfo, type,
e);
}
deadNodeDetector.probeCallBack(this, false);
}
}
/**
* Handle data node, according to probe result. When ProbeType is CHECK_DEAD,
* remove the datanode from DeadNodeDetector#deadNodes if probe success.
*/
private void probeCallBack(Probe probe, boolean success) {
LOG.debug("Probe datanode: {} result: {}, type: {}",
probe.getDatanodeInfo(), success, probe.getType());
probeInProg.remove(probe.getDatanodeInfo().getDatanodeUuid());
if (success) {
if (probe.getType() == ProbeType.CHECK_DEAD) {
LOG.info("Remove the node out from dead node list: {}. ",
probe.getDatanodeInfo());
removeNodeFromDeadNode(probe.getDatanodeInfo());
}
}
}
/**
* Check dead node periodically.
*/
private void checkDeadNodes() {
long ts = Time.monotonicNow();
if (ts - lastDetectDeadTS > deadNodeDetectInterval) {
Set<DatanodeInfo> datanodeInfos = clearAndGetDetectedDeadNodes();
for (DatanodeInfo datanodeInfo : datanodeInfos) {
LOG.debug("Add dead node to check: {}.", datanodeInfo);
if (!deadNodesProbeQueue.offer(datanodeInfo)) {
LOG.debug("Skip to add dead node {} to check " +
"since the probe queue is full.", datanodeInfo);
break;
}
}
lastDetectDeadTS = ts;
}
state = State.IDLE;
}
private void idle() { private void idle() {
try { try {
Thread.sleep(IDLE_SLEEP_MS); Thread.sleep(IDLE_SLEEP_MS);
@ -113,11 +358,11 @@ public class DeadNodeDetector implements Runnable {
} }
state = State.IDLE; state = State.CHECK_DEAD;
} }
private void init() { private void init() {
state = State.IDLE; state = State.CHECK_DEAD;
} }
private void addToDead(DatanodeInfo datanodeInfo) { private void addToDead(DatanodeInfo datanodeInfo) {
@ -128,6 +373,14 @@ public class DeadNodeDetector implements Runnable {
return deadNodes.containsKey(datanodeInfo.getDatanodeUuid()); return deadNodes.containsKey(datanodeInfo.getDatanodeUuid());
} }
private void removeFromDead(DatanodeInfo datanodeInfo) {
deadNodes.remove(datanodeInfo.getDatanodeUuid());
}
public Queue<DatanodeInfo> getDeadNodesProbeQueue() {
return deadNodesProbeQueue;
}
/** /**
* Add datanode in deadNodes and dfsInputStreamNodes. The node is considered * Add datanode in deadNodes and dfsInputStreamNodes. The node is considered
* to dead node. The dead node is shared by all the DFSInputStreams in the * to dead node. The dead node is shared by all the DFSInputStreams in the
@ -182,4 +435,49 @@ public class DeadNodeDetector implements Runnable {
} }
} }
} }
/**
* Remove dead node from dfsInputStreamNodes#dfsInputStream and deadNodes.
*/
public synchronized void removeNodeFromDeadNode(DatanodeInfo datanodeInfo) {
for (Map.Entry<DFSInputStream, HashSet<DatanodeInfo>> entry :
dfsInputStreamNodes.entrySet()) {
Set<DatanodeInfo> datanodeInfos = entry.getValue();
if (datanodeInfos.remove(datanodeInfo)) {
DFSInputStream dfsInputStream = entry.getKey();
dfsInputStream.removeFromLocalDeadNodes(datanodeInfo);
}
}
removeFromDead(datanodeInfo);
}
private static void probeSleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Schedule probe data node.
*/
static class ProbeScheduler implements Runnable {
private DeadNodeDetector deadNodeDetector;
private ProbeType type;
ProbeScheduler(DeadNodeDetector deadNodeDetector, ProbeType type) {
this.deadNodeDetector = deadNodeDetector;
this.type = type;
}
@Override
public void run() {
while (true) {
deadNodeDetector.scheduleProbe(type);
probeSleep(deadNodeDetector.deadNodeDetectInterval);
}
}
}
} }

View File

@ -156,6 +156,28 @@ public interface HdfsClientConfigKeys {
"dfs.client.deadnode.detection.enabled"; "dfs.client.deadnode.detection.enabled";
boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false; boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false;
String DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY =
"dfs.client.deadnode.detection.deadnode.queue.max";
int DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_DEFAULT = 100;
String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY =
"dfs.client.deadnode.detection.probe.connection.timeout.ms";
long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT =
20000;
String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY =
"dfs.client.deadnode.detection.probe.deadnode.threads";
int DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_DEFAULT = 10;
String DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY =
"dfs.client.deadnode.detection.rpc.threads";
int DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_DEFAULT = 20;
String DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY =
"dfs.client.deadnode.detection.probe.deadnode.interval.ms";
long DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_DEFAULT =
60 * 1000; // 60s
String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
"dfs.datanode.kerberos.principal"; "dfs.datanode.kerberos.principal";
String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";

View File

@ -2996,6 +2996,46 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.deadnode.detection.deadnode.queue.max</name>
<value>100</value>
<description>
The max queue size of probing dead node.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.probe.deadnode.threads</name>
<value>10</value>
<description>
The maximum number of threads to use for probing dead node.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.rpc.threads</name>
<value>20</value>
<description>
The maximum number of threads to use for calling RPC call to recheck the liveness of dead node.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.probe.deadnode.interval.ms</name>
<value>60000</value>
<description>
Interval time in milliseconds for probing dead node behavior.
</description>
</property>
<property>
<name>dfs.client.deadnode.detection.probe.connection.timeout.ms</name>
<value>20000</value>
<description>
Connection timeout for probing dead node in milliseconds.
</description>
</property>
<property> <property>
<name>dfs.namenode.lease-recheck-interval-ms</name> <name>dfs.namenode.lease-recheck-interval-ms</name>
<value>2000</value> <value>2000</value>

View File

@ -17,19 +17,24 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
/** /**
@ -121,21 +126,7 @@ public class TestDeadNodeDetection {
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/testDeadNodeMultipleDFSInputStream"); Path filePath = new Path("/testDeadNodeMultipleDFSInputStream");
createFile(fs, filePath);
// 256 bytes data chunk for writes
byte[] bytes = new byte[256];
for (int index = 0; index < bytes.length; index++) {
bytes[index] = '0';
}
// File with a 512 bytes block size
FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
// Write a block to DN (2x256bytes).
out.write(bytes);
out.write(bytes);
out.hflush();
out.close();
String datanodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid(); String datanodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid();
FSDataInputStream in1 = fs.open(filePath); FSDataInputStream in1 = fs.open(filePath);
@ -170,7 +161,7 @@ public class TestDeadNodeDetection {
} finally { } finally {
in1.close(); in1.close();
in2.close(); in2.close();
fs.delete(new Path("/testDeadNodeMultipleDFSInputStream"), true); deleteFile(fs, filePath);
// check the dead node again here, the dead node is expected be removed // check the dead node again here, the dead node is expected be removed
assertEquals(0, dfsClient1.getDeadNodes(din1).size()); assertEquals(0, dfsClient1.getDeadNodes(din1).size());
assertEquals(0, dfsClient2.getDeadNodes(din2).size()); assertEquals(0, dfsClient2.getDeadNodes(din2).size());
@ -180,4 +171,135 @@ public class TestDeadNodeDetection {
.clearAndGetDetectedDeadNodes().size()); .clearAndGetDetectedDeadNodes().size());
} }
} }
@Test
public void testDeadNodeDetectionDeadNodeRecovery() throws Exception {
conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
1000);
// We'll be using a 512 bytes block size just for tests
// so making sure the checksum bytes too match it.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/testDeadNodeDetectionDeadNodeRecovery");
createFile(fs, filePath);
// Remove three DNs,
MiniDFSCluster.DataNodeProperties one = cluster.stopDataNode(0);
cluster.stopDataNode(0);
cluster.stopDataNode(0);
FSDataInputStream in = fs.open(filePath);
DFSInputStream din = (DFSInputStream) in.getWrappedStream();
DFSClient dfsClient = din.getDFSClient();
try {
try {
in.read();
} catch (BlockMissingException e) {
}
assertEquals(3, dfsClient.getDeadNodes(din).size());
assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
cluster.restartDataNode(one, true);
waitForDeadNodeRecovery(din);
assertEquals(2, dfsClient.getDeadNodes(din).size());
assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
} finally {
in.close();
deleteFile(fs, filePath);
assertEquals(0, dfsClient.getDeadNodes(din).size());
assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
}
}
@Test
public void testDeadNodeDetectionMaxDeadNodesProbeQueue() throws Exception {
conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
conf.setLong(DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
1000);
conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY, 1);
// We'll be using a 512 bytes block size just for tests
// so making sure the checksum bytes too match it.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("testDeadNodeDetectionMaxDeadNodesProbeQueue");
createFile(fs, filePath);
// Remove three DNs,
cluster.stopDataNode(0);
cluster.stopDataNode(0);
cluster.stopDataNode(0);
FSDataInputStream in = fs.open(filePath);
DFSInputStream din = (DFSInputStream) in.getWrappedStream();
DFSClient dfsClient = din.getDFSClient();
try {
try {
in.read();
} catch (BlockMissingException e) {
}
Thread.sleep(1500);
Assert.assertTrue((dfsClient.getClientContext().getDeadNodeDetector()
.getDeadNodesProbeQueue().size()
+ dfsClient.getDeadNodes(din).size()) <= 4);
} finally {
in.close();
deleteFile(fs, filePath);
}
}
private void createFile(FileSystem fs, Path filePath) throws IOException {
FSDataOutputStream out = null;
try {
// 256 bytes data chunk for writes
byte[] bytes = new byte[256];
for (int index = 0; index < bytes.length; index++) {
bytes[index] = '0';
}
// File with a 512 bytes block size
out = fs.create(filePath, true, 4096, (short) 3, 512);
// Write a block to all 3 DNs (2x256bytes).
out.write(bytes);
out.write(bytes);
out.hflush();
} finally {
out.close();
}
}
private void deleteFile(FileSystem fs, Path filePath) throws IOException {
fs.delete(filePath, true);
}
private void waitForDeadNodeRecovery(DFSInputStream din) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
if (din.getDFSClient().getDeadNodes(din).size() == 2) {
return true;
}
} catch (Exception e) {
// Ignore the exception
}
return false;
}
}, 5000, 100000);
}
} }