HDFS-6995. Block should be placed in the client's 'rack-local' node if 'client-local' node is not available (vinayakumarb)
This commit is contained in:
parent
16333b4fd8
commit
ed841dd9a9
|
@ -1054,6 +1054,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-7131. During HA upgrade, JournalNode should create a new committedTxnId
|
HDFS-7131. During HA upgrade, JournalNode should create a new committedTxnId
|
||||||
file in the current directory. (jing9)
|
file in the current directory. (jing9)
|
||||||
|
|
||||||
|
HDFS-6995. Block should be placed in the client's 'rack-local' node
|
||||||
|
if 'client-local' node is not available (vinayakumarb)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-6584 ARCHIVAL STORAGE
|
BREAKDOWN OF HDFS-6584 ARCHIVAL STORAGE
|
||||||
|
|
||||||
HDFS-6677. Change INodeFile and FSImage to support storage policy ID.
|
HDFS-6677. Change INodeFile and FSImage to support storage policy ID.
|
||||||
|
|
|
@ -1493,7 +1493,7 @@ public class BlockManager {
|
||||||
/** Choose target for getting additional datanodes for an existing pipeline. */
|
/** Choose target for getting additional datanodes for an existing pipeline. */
|
||||||
public DatanodeStorageInfo[] chooseTarget4AdditionalDatanode(String src,
|
public DatanodeStorageInfo[] chooseTarget4AdditionalDatanode(String src,
|
||||||
int numAdditionalNodes,
|
int numAdditionalNodes,
|
||||||
DatanodeDescriptor clientnode,
|
Node clientnode,
|
||||||
List<DatanodeStorageInfo> chosen,
|
List<DatanodeStorageInfo> chosen,
|
||||||
Set<Node> excludes,
|
Set<Node> excludes,
|
||||||
long blocksize,
|
long blocksize,
|
||||||
|
@ -1513,7 +1513,7 @@ public class BlockManager {
|
||||||
* Set, long, List, BlockStoragePolicy)
|
* Set, long, List, BlockStoragePolicy)
|
||||||
*/
|
*/
|
||||||
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
|
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
|
||||||
final int numOfReplicas, final DatanodeDescriptor client,
|
final int numOfReplicas, final Node client,
|
||||||
final Set<Node> excludedNodes,
|
final Set<Node> excludedNodes,
|
||||||
final long blocksize,
|
final long blocksize,
|
||||||
final List<String> favoredNodes,
|
final List<String> favoredNodes,
|
||||||
|
|
|
@ -204,11 +204,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
// add localMachine and related nodes to excludedNodes
|
// add localMachine and related nodes to excludedNodes
|
||||||
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
|
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!clusterMap.contains(writer)) {
|
|
||||||
writer = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean avoidStaleNodes = (stats != null
|
boolean avoidStaleNodes = (stats != null
|
||||||
&& stats.isAvoidingStaleDataNodesForWrite());
|
&& stats.isAvoidingStaleDataNodesForWrite());
|
||||||
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
|
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
|
||||||
|
@ -219,8 +215,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
// sorting nodes to form a pipeline
|
// sorting nodes to form a pipeline
|
||||||
return getPipeline((writer==null)?localNode:writer,
|
return getPipeline(
|
||||||
results.toArray(new DatanodeStorageInfo[results.size()]));
|
(writer != null && writer instanceof DatanodeDescriptor) ? writer
|
||||||
|
: localNode,
|
||||||
|
results.toArray(new DatanodeStorageInfo[results.size()]));
|
||||||
}
|
}
|
||||||
|
|
||||||
private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
|
private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
|
||||||
|
@ -271,7 +269,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
final EnumSet<StorageType> unavailableStorages,
|
final EnumSet<StorageType> unavailableStorages,
|
||||||
final boolean newBlock) {
|
final boolean newBlock) {
|
||||||
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
|
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
|
||||||
return writer;
|
return (writer instanceof DatanodeDescriptor) ? writer : null;
|
||||||
}
|
}
|
||||||
final int numOfResults = results.size();
|
final int numOfResults = results.size();
|
||||||
final int totalReplicasExpected = numOfReplicas + numOfResults;
|
final int totalReplicasExpected = numOfReplicas + numOfResults;
|
||||||
|
|
|
@ -691,8 +691,7 @@ public class DatanodeManager {
|
||||||
names.add(node.getHostName());
|
names.add(node.getHostName());
|
||||||
}
|
}
|
||||||
|
|
||||||
// resolve its network location
|
List<String> rName = resolveNetworkLocation(names);
|
||||||
List<String> rName = dnsToSwitchMapping.resolve(names);
|
|
||||||
String networkLocation;
|
String networkLocation;
|
||||||
if (rName == null) {
|
if (rName == null) {
|
||||||
LOG.error("The resolve call returned null!");
|
LOG.error("The resolve call returned null!");
|
||||||
|
@ -704,6 +703,18 @@ public class DatanodeManager {
|
||||||
return networkLocation;
|
return networkLocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve network locations for specified hosts
|
||||||
|
*
|
||||||
|
* @param names
|
||||||
|
* @return Network locations if available, Else returns null
|
||||||
|
*/
|
||||||
|
public List<String> resolveNetworkLocation(List<String> names) {
|
||||||
|
// resolve its network location
|
||||||
|
List<String> rName = dnsToSwitchMapping.resolve(names);
|
||||||
|
return rName;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resolve a node's dependencies in the network. If the DNS to switch
|
* Resolve a node's dependencies in the network. If the DNS to switch
|
||||||
* mapping fails then this method returns empty list of dependencies
|
* mapping fails then this method returns empty list of dependencies
|
||||||
|
|
|
@ -266,6 +266,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.util.MBeans;
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
|
import org.apache.hadoop.net.NodeBase;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
|
@ -3122,7 +3123,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
final long blockSize;
|
final long blockSize;
|
||||||
final int replication;
|
final int replication;
|
||||||
final byte storagePolicyID;
|
final byte storagePolicyID;
|
||||||
DatanodeDescriptor clientNode = null;
|
Node clientNode = null;
|
||||||
|
String clientMachine = null;
|
||||||
|
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: "
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: "
|
||||||
|
@ -3153,14 +3155,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
+ maxBlocksPerFile);
|
+ maxBlocksPerFile);
|
||||||
}
|
}
|
||||||
blockSize = pendingFile.getPreferredBlockSize();
|
blockSize = pendingFile.getPreferredBlockSize();
|
||||||
|
clientMachine = pendingFile.getFileUnderConstructionFeature()
|
||||||
|
.getClientMachine();
|
||||||
clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
|
clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
|
||||||
pendingFile.getFileUnderConstructionFeature().getClientMachine());
|
clientMachine);
|
||||||
replication = pendingFile.getFileReplication();
|
replication = pendingFile.getFileReplication();
|
||||||
storagePolicyID = pendingFile.getStoragePolicyID();
|
storagePolicyID = pendingFile.getStoragePolicyID();
|
||||||
} finally {
|
} finally {
|
||||||
readUnlock();
|
readUnlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (clientNode == null) {
|
||||||
|
clientNode = getClientNode(clientMachine);
|
||||||
|
}
|
||||||
|
|
||||||
// choose targets for the new block to be allocated.
|
// choose targets for the new block to be allocated.
|
||||||
final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget4NewBlock(
|
final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget4NewBlock(
|
||||||
src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
|
src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
|
||||||
|
@ -3217,6 +3225,24 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
return makeLocatedBlock(newBlock, targets, offset);
|
return makeLocatedBlock(newBlock, targets, offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Resolve clientmachine address to get a network location path
|
||||||
|
*/
|
||||||
|
private Node getClientNode(String clientMachine) {
|
||||||
|
List<String> hosts = new ArrayList<String>(1);
|
||||||
|
hosts.add(clientMachine);
|
||||||
|
List<String> rName = getBlockManager().getDatanodeManager()
|
||||||
|
.resolveNetworkLocation(hosts);
|
||||||
|
Node clientNode = null;
|
||||||
|
if (rName != null) {
|
||||||
|
// Able to resolve clientMachine mapping.
|
||||||
|
// Create a temp node to findout the rack local nodes
|
||||||
|
clientNode = new NodeBase(rName.get(0) + NodeBase.PATH_SEPARATOR_STR
|
||||||
|
+ clientMachine);
|
||||||
|
}
|
||||||
|
return clientNode;
|
||||||
|
}
|
||||||
|
|
||||||
static class FileState {
|
static class FileState {
|
||||||
public final INodeFile inode;
|
public final INodeFile inode;
|
||||||
public final String path;
|
public final String path;
|
||||||
|
@ -3350,7 +3376,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
//check if the feature is enabled
|
//check if the feature is enabled
|
||||||
dtpReplaceDatanodeOnFailure.checkEnabled();
|
dtpReplaceDatanodeOnFailure.checkEnabled();
|
||||||
|
|
||||||
final DatanodeDescriptor clientnode;
|
Node clientnode = null;
|
||||||
|
String clientMachine;
|
||||||
final long preferredblocksize;
|
final long preferredblocksize;
|
||||||
final byte storagePolicyID;
|
final byte storagePolicyID;
|
||||||
final List<DatanodeStorageInfo> chosen;
|
final List<DatanodeStorageInfo> chosen;
|
||||||
|
@ -3375,8 +3402,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
if (inode != null) src = inode.getFullPathName();
|
if (inode != null) src = inode.getFullPathName();
|
||||||
}
|
}
|
||||||
final INodeFile file = checkLease(src, clientName, inode, fileId);
|
final INodeFile file = checkLease(src, clientName, inode, fileId);
|
||||||
String clientMachine = file.getFileUnderConstructionFeature()
|
clientMachine = file.getFileUnderConstructionFeature().getClientMachine();
|
||||||
.getClientMachine();
|
|
||||||
clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
|
clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
|
||||||
preferredblocksize = file.getPreferredBlockSize();
|
preferredblocksize = file.getPreferredBlockSize();
|
||||||
storagePolicyID = file.getStoragePolicyID();
|
storagePolicyID = file.getStoragePolicyID();
|
||||||
|
@ -3388,6 +3414,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
readUnlock();
|
readUnlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (clientnode == null) {
|
||||||
|
clientnode = getClientNode(clientMachine);
|
||||||
|
}
|
||||||
|
|
||||||
// choose new datanodes.
|
// choose new datanodes.
|
||||||
final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode(
|
final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode(
|
||||||
src, numAdditionalNodes, clientnode, chosen,
|
src, numAdditionalNodes, clientnode, chosen,
|
||||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||||
|
@ -142,10 +141,18 @@ public class TestReplicationPolicyConsiderLoad {
|
||||||
}
|
}
|
||||||
assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
|
assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
|
||||||
|
|
||||||
|
// update references of writer DN to update the de-commissioned state
|
||||||
|
List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
|
||||||
|
dnManager.fetchDatanodes(liveNodes, null, false);
|
||||||
|
DatanodeDescriptor writerDn = null;
|
||||||
|
if (liveNodes.contains(dataNodes[0])) {
|
||||||
|
writerDn = liveNodes.get(liveNodes.indexOf(dataNodes[0]));
|
||||||
|
}
|
||||||
|
|
||||||
// Call chooseTarget()
|
// Call chooseTarget()
|
||||||
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
|
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
|
||||||
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3,
|
.getBlockPlacementPolicy().chooseTarget("testFile.txt", 3,
|
||||||
dataNodes[0], new ArrayList<DatanodeStorageInfo>(), false, null,
|
writerDn, new ArrayList<DatanodeStorageInfo>(), false, null,
|
||||||
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY);
|
||||||
|
|
||||||
assertEquals(3, targets.length);
|
assertEquals(3, targets.length);
|
||||||
|
|
Loading…
Reference in New Issue