HDFS-14648. Implement DeadNodeDetector basic model. Contributed by Lisheng Sun.

This commit is contained in:
Yiqun Lin 2019-11-16 11:32:41 +08:00
parent 67f2c491fe
commit b3119b9ab6
9 changed files with 616 additions and 45 deletions

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping; import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -118,6 +119,19 @@ public class ClientContext {
private NodeBase clientNode; private NodeBase clientNode;
private boolean topologyResolutionEnabled; private boolean topologyResolutionEnabled;
private Daemon deadNodeDetectorThr = null;
/**
* The switch to DeadNodeDetector.
*/
private boolean deadNodeDetectionEnabled = false;
/**
* Detect the dead datanodes in advance, and share this information among all
* the DFSInputStreams in the same client.
*/
private DeadNodeDetector deadNodeDetector = null;
private ClientContext(String name, DfsClientConf conf, private ClientContext(String name, DfsClientConf conf,
Configuration config) { Configuration config) {
final ShortCircuitConf scConf = conf.getShortCircuitConf(); final ShortCircuitConf scConf = conf.getShortCircuitConf();
@ -134,6 +148,12 @@ private ClientContext(String name, DfsClientConf conf,
this.byteArrayManager = ByteArrayManager.newInstance( this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf()); conf.getWriteByteArrayManagerConf());
this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
if (deadNodeDetectionEnabled && deadNodeDetector == null) {
deadNodeDetector = new DeadNodeDetector(name);
deadNodeDetectorThr = new Daemon(deadNodeDetector);
deadNodeDetectorThr.start();
}
initTopologyResolution(config); initTopologyResolution(config);
} }
@ -251,4 +271,33 @@ public int getNetworkDistance(DatanodeInfo datanodeInfo) throws IOException {
datanodeInfo.getNetworkLocation()); datanodeInfo.getNetworkLocation());
return NetworkTopology.getDistanceByPath(clientNode, node); return NetworkTopology.getDistanceByPath(clientNode, node);
} }
/**
* The switch to DeadNodeDetector. If true, DeadNodeDetector is available.
*/
public boolean isDeadNodeDetectionEnabled() {
return deadNodeDetectionEnabled;
}
/**
* Obtain DeadNodeDetector of the current client.
*/
public DeadNodeDetector getDeadNodeDetector() {
return deadNodeDetector;
}
/**
* Close dead node detector thread.
*/
public void stopDeadNodeDetectorThread() {
if (deadNodeDetectorThr != null) {
deadNodeDetectorThr.interrupt();
try {
deadNodeDetectorThr.join(3000);
} catch (InterruptedException e) {
LOG.warn("Encountered exception while waiting to join on dead " +
"node detector thread.", e);
}
}
}
} }

View File

@ -44,6 +44,8 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -631,6 +633,8 @@ public synchronized void close() throws IOException {
// lease renewal stops when all files are closed // lease renewal stops when all files are closed
closeAllFilesBeingWritten(false); closeAllFilesBeingWritten(false);
clientRunning = false; clientRunning = false;
// close dead node detector thread
clientContext.stopDeadNodeDetectorThread();
// close connections to the namenode // close connections to the namenode
closeConnectionToNamenode(); closeConnectionToNamenode();
} }
@ -3226,4 +3230,98 @@ public HAServiceProtocol.HAServiceState getHAServiceState()
throws IOException { throws IOException {
return namenode.getHAServiceState(); return namenode.getHAServiceState();
} }
/**
* If deadNodeDetectionEnabled is true, return the dead nodes that detected by
* all the DFSInputStreams in the same client. Otherwise return the dead nodes
* that detected by given DFSInputStream.
*/
public ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getDeadNodes(
DFSInputStream dfsInputStream) {
if (clientContext.isDeadNodeDetectionEnabled()) {
ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
if (dfsInputStream != null) {
deadNodes.putAll(dfsInputStream.getLocalDeadNodes());
}
Set<DatanodeInfo> detectDeadNodes =
clientContext.getDeadNodeDetector().clearAndGetDetectedDeadNodes();
for (DatanodeInfo detectDeadNode : detectDeadNodes) {
deadNodes.put(detectDeadNode, detectDeadNode);
}
return deadNodes;
} else {
return dfsInputStream.getLocalDeadNodes();
}
}
/**
* If deadNodeDetectionEnabled is true, judgement based on whether this
* datanode is included or not in DeadNodeDetector. Otherwise judgment based
* given DFSInputStream.
*/
public boolean isDeadNode(DFSInputStream dfsInputStream,
DatanodeInfo datanodeInfo) {
if (isDeadNodeDetectionEnabled()) {
boolean isDeadNode =
clientContext.getDeadNodeDetector().isDeadNode(datanodeInfo);
if (dfsInputStream != null) {
isDeadNode = isDeadNode
|| dfsInputStream.getLocalDeadNodes().contains(datanodeInfo);
}
return isDeadNode;
} else {
return dfsInputStream.getLocalDeadNodes().contains(datanodeInfo);
}
}
/**
* Add given datanode in DeadNodeDetector.
*/
public void addNodeToDeadNodeDetector(DFSInputStream dfsInputStream,
DatanodeInfo datanodeInfo) {
if (!isDeadNodeDetectionEnabled()) {
LOG.debug("DeadNode detection is not enabled, skip to add node {}.",
datanodeInfo);
return;
}
clientContext.getDeadNodeDetector().addNodeToDetect(dfsInputStream,
datanodeInfo);
}
/**
* Remove given datanode from DeadNodeDetector.
*/
public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream,
DatanodeInfo datanodeInfo) {
if (!isDeadNodeDetectionEnabled()) {
LOG.debug("DeadNode detection is not enabled, skip to remove node {}.",
datanodeInfo);
return;
}
clientContext.getDeadNodeDetector()
.removeNodeFromDeadNodeDetector(dfsInputStream, datanodeInfo);
}
/**
* Remove datanodes that given block placed on from DeadNodeDetector.
*/
public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream,
LocatedBlocks locatedBlocks) {
if (!isDeadNodeDetectionEnabled() || locatedBlocks == null) {
LOG.debug("DeadNode detection is not enabled or given block {} " +
"is null, skip to remove node.", locatedBlocks);
return;
}
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
for (DatanodeInfo datanodeInfo : locatedBlock.getLocations()) {
removeNodeFromDeadNodeDetector(dfsInputStream, datanodeInfo);
}
}
}
private boolean isDeadNodeDetectionEnabled() {
return clientContext.isDeadNodeDetectionEnabled();
}
} }

View File

@ -171,10 +171,26 @@ public class DFSInputStream extends FSInputStream
private byte[] oneByteBuf; // used for 'int read()' private byte[] oneByteBuf; // used for 'int read()'
void addToDeadNodes(DatanodeInfo dnInfo) { protected void addToLocalDeadNodes(DatanodeInfo dnInfo) {
deadNodes.put(dnInfo, dnInfo); deadNodes.put(dnInfo, dnInfo);
} }
protected void removeFromLocalDeadNodes(DatanodeInfo dnInfo) {
deadNodes.remove(dnInfo);
}
protected ConcurrentHashMap<DatanodeInfo, DatanodeInfo> getLocalDeadNodes() {
return deadNodes;
}
private void clearLocalDeadNodes() {
deadNodes.clear();
}
protected DFSClient getDFSClient() {
return dfsClient;
}
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
LocatedBlocks locatedBlocks) throws IOException { LocatedBlocks locatedBlocks) throws IOException {
this.dfsClient = dfsClient; this.dfsClient = dfsClient;
@ -612,7 +628,8 @@ private synchronized DatanodeInfo blockSeekTo(long target)
+ "{}, add to deadNodes and continue. ", targetAddr, src, + "{}, add to deadNodes and continue. ", targetAddr, src,
targetBlock.getBlock(), ex); targetBlock.getBlock(), ex);
// Put chosen node into dead list, continue // Put chosen node into dead list, continue
addToDeadNodes(chosenNode); addToLocalDeadNodes(chosenNode);
dfsClient.addNodeToDeadNodeDetector(this, chosenNode);
} }
} }
} }
@ -663,28 +680,40 @@ protected BlockReader getBlockReader(LocatedBlock targetBlock,
*/ */
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (!closed.compareAndSet(false, true)) { try {
DFSClient.LOG.debug("DFSInputStream has been closed already"); if (!closed.compareAndSet(false, true)) {
return; DFSClient.LOG.debug("DFSInputStream has been closed already");
} return;
dfsClient.checkOpen(); }
dfsClient.checkOpen();
if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
final StringBuilder builder = new StringBuilder(); final StringBuilder builder = new StringBuilder();
extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() { extendedReadBuffers
private String prefix = ""; .visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
@Override private String prefix = "";
public void accept(ByteBuffer k, Object v) {
builder.append(prefix).append(k); @Override
prefix = ", "; public void accept(ByteBuffer k, Object v) {
} builder.append(prefix).append(k);
}); prefix = ", ";
DFSClient.LOG.warn("closing file " + src + ", but there are still " + }
"unreleased ByteBuffers allocated by read(). " + });
"Please release " + builder.toString() + "."); DFSClient.LOG.warn("closing file " + src + ", but there are still "
+ "unreleased ByteBuffers allocated by read(). "
+ "Please release " + builder.toString() + ".");
}
closeCurrentBlockReaders();
super.close();
} finally {
/**
* If dfsInputStream is closed and datanode is in
* DeadNodeDetector#dfsInputStreamNodes, we need remove the datanode from
* the DeadNodeDetector#dfsInputStreamNodes. Since user should not use
* this dfsInputStream anymore.
*/
dfsClient.removeNodeFromDeadNodeDetector(this, locatedBlocks);
} }
closeCurrentBlockReaders();
super.close();
} }
@Override @Override
@ -741,7 +770,8 @@ private synchronized int readBuffer(ReaderStrategy reader, int len,
*/ */
sourceFound = seekToBlockSource(pos); sourceFound = seekToBlockSource(pos);
} else { } else {
addToDeadNodes(currentNode); addToLocalDeadNodes(currentNode);
dfsClient.addNodeToDeadNodeDetector(this, currentNode);
sourceFound = seekToNewSource(pos); sourceFound = seekToNewSource(pos);
} }
if (!sourceFound) { if (!sourceFound) {
@ -801,7 +831,8 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
} }
blockEnd = -1; blockEnd = -1;
if (currentNode != null) { if (currentNode != null) {
addToDeadNodes(currentNode); addToLocalDeadNodes(currentNode);
dfsClient.addNodeToDeadNodeDetector(this, currentNode);
} }
if (--retries == 0) { if (--retries == 0) {
throw e; throw e;
@ -883,7 +914,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
private LocatedBlock refetchLocations(LocatedBlock block, private LocatedBlock refetchLocations(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException { Collection<DatanodeInfo> ignoredNodes) throws IOException {
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
deadNodes, ignoredNodes); dfsClient.getDeadNodes(this), ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src; String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
String description = "Could not obtain block: " + blockInfo; String description = "Could not obtain block: " + blockInfo;
@ -924,7 +955,7 @@ private LocatedBlock refetchLocations(LocatedBlock block,
throw new InterruptedIOException( throw new InterruptedIOException(
"Interrupted while choosing DataNode for read."); "Interrupted while choosing DataNode for read.");
} }
deadNodes.clear(); //2nd option is to remove only nodes[blockId] clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
openInfo(true); openInfo(true);
block = refreshLocatedBlock(block); block = refreshLocatedBlock(block);
failures++; failures++;
@ -945,7 +976,7 @@ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
StorageType storageType = null; StorageType storageType = null;
if (nodes != null) { if (nodes != null) {
for (int i = 0; i < nodes.length; i++) { for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i]) if (!dfsClient.getDeadNodes(this).containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
chosenNode = nodes[i]; chosenNode = nodes[i];
// Storage types are ordered to correspond with nodes, so use the same // Storage types are ordered to correspond with nodes, so use the same
@ -1097,7 +1128,7 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
DFSClient.LOG.warn(msg); DFSClient.LOG.warn(msg);
// we want to remember what we have tried // we want to remember what we have tried
corruptedBlocks.addCorruptedBlock(block.getBlock(), datanode.info); corruptedBlocks.addCorruptedBlock(block.getBlock(), datanode.info);
addToDeadNodes(datanode.info); addToLocalDeadNodes(datanode.info);
throw new IOException(msg); throw new IOException(msg);
} catch (IOException e) { } catch (IOException e) {
checkInterrupted(e); checkInterrupted(e);
@ -1119,7 +1150,8 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
String msg = "Failed to connect to " + datanode.addr + " for file " String msg = "Failed to connect to " + datanode.addr + " for file "
+ src + " for block " + block.getBlock() + ":" + e; + src + " for block " + block.getBlock() + ":" + e;
DFSClient.LOG.warn("Connection failure: " + msg, e); DFSClient.LOG.warn("Connection failure: " + msg, e);
addToDeadNodes(datanode.info); addToLocalDeadNodes(datanode.info);
dfsClient.addNodeToDeadNodeDetector(this, datanode.info);
throw new IOException(msg); throw new IOException(msg);
} }
// Refresh the block for updated tokens in case of token failures or // Refresh the block for updated tokens in case of token failures or
@ -1522,14 +1554,14 @@ public synchronized boolean seekToNewSource(long targetPos)
if (currentNode == null) { if (currentNode == null) {
return seekToBlockSource(targetPos); return seekToBlockSource(targetPos);
} }
boolean markedDead = deadNodes.containsKey(currentNode); boolean markedDead = dfsClient.isDeadNode(this, currentNode);
addToDeadNodes(currentNode); addToLocalDeadNodes(currentNode);
DatanodeInfo oldNode = currentNode; DatanodeInfo oldNode = currentNode;
DatanodeInfo newNode = blockSeekTo(targetPos); DatanodeInfo newNode = blockSeekTo(targetPos);
if (!markedDead) { if (!markedDead) {
/* remove it from deadNodes. blockSeekTo could have cleared /* remove it from deadNodes. blockSeekTo could have cleared
* deadNodes and added currentNode again. Thats ok. */ * deadNodes and added currentNode again. Thats ok. */
deadNodes.remove(oldNode); removeFromLocalDeadNodes(oldNode);
} }
if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) { if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) {
currentNode = newNode; currentNode = newNode;

View File

@ -147,10 +147,6 @@ protected String getSrc() {
return src; return src;
} }
protected DFSClient getDFSClient() {
return dfsClient;
}
protected LocatedBlocks getLocatedBlocks() { protected LocatedBlocks getLocatedBlocks() {
return locatedBlocks; return locatedBlocks;
} }
@ -283,7 +279,7 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
"block" + block.getBlock(), e); "block" + block.getBlock(), e);
// re-fetch the block in case the block has been moved // re-fetch the block in case the block has been moved
fetchBlockAt(block.getStartOffset()); fetchBlockAt(block.getStartOffset());
addToDeadNodes(dnInfo.info); addToLocalDeadNodes(dnInfo.info);
} }
} }
if (reader != null) { if (reader != null) {

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Detect the dead nodes in advance, and share this information among all the
* DFSInputStreams in the same client.
*/
public class DeadNodeDetector implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(DeadNodeDetector.class);
/**
* Waiting time when DeadNodeDetector happens error.
*/
private static final long ERROR_SLEEP_MS = 5000;
/**
* Waiting time when DeadNodeDetector's state is idle.
*/
private static final long IDLE_SLEEP_MS = 10000;
/**
* Client context name.
*/
private String name;
/**
* Dead nodes shared by all the DFSInputStreams of the client.
*/
private final ConcurrentHashMap<String, DatanodeInfo> deadNodes;
/**
* Record dead nodes by one DFSInputStream. When dead node is not used by one
* DFSInputStream, remove it from dfsInputStreamNodes#DFSInputStream. If
* DFSInputStream does not include any dead node, remove DFSInputStream from
* dfsInputStreamNodes.
*/
private final ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>
dfsInputStreamNodes;
/**
* The state of DeadNodeDetector.
*/
private enum State {
INIT, CHECK_DEAD, IDLE, ERROR
}
private State state;
public DeadNodeDetector(String name) {
this.deadNodes = new ConcurrentHashMap<String, DatanodeInfo>();
this.dfsInputStreamNodes =
new ConcurrentHashMap<DFSInputStream, HashSet<DatanodeInfo>>();
this.name = name;
LOG.info("Start dead node detector for DFSClient {}.", this.name);
state = State.INIT;
}
@Override
public void run() {
while (true) {
clearAndGetDetectedDeadNodes();
LOG.debug("Current detector state {}, the detected nodes: {}.", state,
deadNodes.values());
switch (state) {
case INIT:
init();
break;
case IDLE:
idle();
break;
case ERROR:
try {
Thread.sleep(ERROR_SLEEP_MS);
} catch (InterruptedException e) {
}
return;
default:
break;
}
}
}
private void idle() {
try {
Thread.sleep(IDLE_SLEEP_MS);
} catch (InterruptedException e) {
}
state = State.IDLE;
}
private void init() {
state = State.IDLE;
}
private void addToDead(DatanodeInfo datanodeInfo) {
deadNodes.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
}
public boolean isDeadNode(DatanodeInfo datanodeInfo) {
return deadNodes.containsKey(datanodeInfo.getDatanodeUuid());
}
/**
* Add datanode in deadNodes and dfsInputStreamNodes. The node is considered
* to dead node. The dead node is shared by all the DFSInputStreams in the
* same client.
*/
public synchronized void addNodeToDetect(DFSInputStream dfsInputStream,
DatanodeInfo datanodeInfo) {
HashSet<DatanodeInfo> datanodeInfos =
dfsInputStreamNodes.get(dfsInputStream);
if (datanodeInfos == null) {
datanodeInfos = new HashSet<DatanodeInfo>();
datanodeInfos.add(datanodeInfo);
dfsInputStreamNodes.putIfAbsent(dfsInputStream, datanodeInfos);
} else {
datanodeInfos.add(datanodeInfo);
}
addToDead(datanodeInfo);
}
/**
* Remove dead node which is not used by any DFSInputStream from deadNodes.
* @return new dead node shared by all DFSInputStreams.
*/
public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() {
// remove the dead nodes who doesn't have any inputstream first
Set<DatanodeInfo> newDeadNodes = new HashSet<DatanodeInfo>();
for (HashSet<DatanodeInfo> datanodeInfos : dfsInputStreamNodes.values()) {
newDeadNodes.addAll(datanodeInfos);
}
for (DatanodeInfo datanodeInfo : deadNodes.values()) {
if (!newDeadNodes.contains(datanodeInfo)) {
deadNodes.remove(datanodeInfo.getDatanodeUuid());
}
}
return new HashSet<>(deadNodes.values());
}
/**
* Remove dead node from dfsInputStreamNodes#dfsInputStream. If
* dfsInputStreamNodes#dfsInputStream does not contain any dead node, remove
* it from dfsInputStreamNodes.
*/
public synchronized void removeNodeFromDeadNodeDetector(
DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) {
Set<DatanodeInfo> datanodeInfos = dfsInputStreamNodes.get(dfsInputStream);
if (datanodeInfos != null) {
datanodeInfos.remove(datanodeInfo);
if (datanodeInfos.isEmpty()) {
dfsInputStreamNodes.remove(dfsInputStream);
}
}
}
}

View File

@ -152,6 +152,10 @@ public interface HdfsClientConfigKeys {
"dfs.client.block.reader.remote.buffer.size"; "dfs.client.block.reader.remote.buffer.size";
int DFS_CLIENT_BLOCK_READER_REMOTE_BUFFER_SIZE_DEFAULT = 8192; int DFS_CLIENT_BLOCK_READER_REMOTE_BUFFER_SIZE_DEFAULT = 8192;
String DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY =
"dfs.client.deadnode.detection.enabled";
boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false;
String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
"dfs.datanode.kerberos.principal"; "dfs.datanode.kerberos.principal";
String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";

View File

@ -26,14 +26,19 @@
import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
@ -47,6 +52,8 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT;
@ -61,8 +68,6 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
@ -71,6 +76,8 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT;
@ -87,11 +94,6 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/** /**
* DFSClient configuration. * DFSClient configuration.
*/ */
@ -145,6 +147,8 @@ public class DfsClientConf {
private final boolean dataTransferTcpNoDelay; private final boolean dataTransferTcpNoDelay;
private final boolean deadNodeDetectionEnabled;
public DfsClientConf(Configuration conf) { public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout // The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getRpcTimeout(conf); hdfsTimeout = Client.getRpcTimeout(conf);
@ -262,6 +266,10 @@ public DfsClientConf(Configuration conf) {
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
deadNodeDetectionEnabled =
conf.getBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY,
DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT);
stripedReadThreadpoolSize = conf.getInt( stripedReadThreadpoolSize = conf.getInt(
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY,
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT); HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT);
@ -595,6 +603,13 @@ public int getStripedReadThreadpoolSize() {
return stripedReadThreadpoolSize; return stripedReadThreadpoolSize;
} }
/**
* @return the deadNodeDetectionEnabled
*/
public boolean isDeadNodeDetectionEnabled() {
return deadNodeDetectionEnabled;
}
/** /**
* @return the replicaAccessorBuilderClasses * @return the replicaAccessorBuilderClasses
*/ */

View File

@ -2987,6 +2987,15 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.deadnode.detection.enabled</name>
<value>false</value>
<description>
Set to true to enable dead node detection in client side. Then all the DFSInputStreams of the same client can
share the dead node information.
</description>
</property>
<property> <property>
<name>dfs.namenode.lease-recheck-interval-ms</name> <name>dfs.namenode.lease-recheck-interval-ms</name>
<value>2000</value> <value>2000</value>

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
/**
* Tests for dead node detection in DFSClient.
*/
public class TestDeadNodeDetection {
private MiniDFSCluster cluster;
private Configuration conf;
@Before
public void setUp() {
cluster = null;
conf = new HdfsConfiguration();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testDeadNodeDetectionInBackground() throws IOException {
conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
// We'll be using a 512 bytes block size just for tests
// so making sure the checksum bytes too match it.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/testDetectDeadNodeInBackground");
// 256 bytes data chunk for writes
byte[] bytes = new byte[256];
for (int index = 0; index < bytes.length; index++) {
bytes[index] = '0';
}
// File with a 512 bytes block size
FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
// Write a block to all 3 DNs (2x256bytes).
out.write(bytes);
out.write(bytes);
out.hflush();
out.close();
// Remove three DNs,
cluster.stopDataNode(0);
cluster.stopDataNode(0);
cluster.stopDataNode(0);
FSDataInputStream in = fs.open(filePath);
DFSInputStream din = null;
DFSClient dfsClient = null;
try {
try {
in.read();
} catch (BlockMissingException e) {
}
din = (DFSInputStream) in.getWrappedStream();
dfsClient = din.getDFSClient();
assertEquals(3, dfsClient.getDeadNodes(din).size());
assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
} finally {
in.close();
fs.delete(new Path("/testDetectDeadNodeInBackground"), true);
// check the dead node again here, the dead node is expected be removed
assertEquals(0, dfsClient.getDeadNodes(din).size());
assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
}
}
@Test
public void testDeadNodeDetectionInMultipleDFSInputStream()
throws IOException {
conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
// We'll be using a 512 bytes block size just for tests
// so making sure the checksum bytes too match it.
conf.setInt("io.bytes.per.checksum", 512);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/testDeadNodeMultipleDFSInputStream");
// 256 bytes data chunk for writes
byte[] bytes = new byte[256];
for (int index = 0; index < bytes.length; index++) {
bytes[index] = '0';
}
// File with a 512 bytes block size
FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512);
// Write a block to DN (2x256bytes).
out.write(bytes);
out.write(bytes);
out.hflush();
out.close();
String datanodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid();
FSDataInputStream in1 = fs.open(filePath);
DFSInputStream din1 = (DFSInputStream) in1.getWrappedStream();
DFSClient dfsClient1 = din1.getDFSClient();
cluster.stopDataNode(0);
FSDataInputStream in2 = fs.open(filePath);
DFSInputStream din2 = null;
DFSClient dfsClient2 = null;
try {
try {
in1.read();
} catch (BlockMissingException e) {
}
din2 = (DFSInputStream) in1.getWrappedStream();
dfsClient2 = din2.getDFSClient();
assertEquals(1, dfsClient1.getDeadNodes(din1).size());
assertEquals(1, dfsClient2.getDeadNodes(din2).size());
assertEquals(1, dfsClient1.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
assertEquals(1, dfsClient2.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
// check the dn uuid of dead node to see if its expected dead node
assertEquals(datanodeUuid,
((DatanodeInfo) dfsClient1.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().toArray()[0]).getDatanodeUuid());
assertEquals(datanodeUuid,
((DatanodeInfo) dfsClient2.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().toArray()[0]).getDatanodeUuid());
} finally {
in1.close();
in2.close();
fs.delete(new Path("/testDeadNodeMultipleDFSInputStream"), true);
// check the dead node again here, the dead node is expected be removed
assertEquals(0, dfsClient1.getDeadNodes(din1).size());
assertEquals(0, dfsClient2.getDeadNodes(din2).size());
assertEquals(0, dfsClient1.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
assertEquals(0, dfsClient2.getClientContext().getDeadNodeDetector()
.clearAndGetDetectedDeadNodes().size());
}
}
}