HDFS-11293: [SPS]: Local DN should be given preference as source node, when target available in same node. Contributed by Yuanbo Liu and Uma Maheswara Rao G
This commit is contained in:
parent
422f870607
commit
df2b551e79
|
@ -298,9 +298,25 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
new ArrayList<StorageTypeNodePair>();
|
new ArrayList<StorageTypeNodePair>();
|
||||||
List<DatanodeStorageInfo> existingBlockStorages =
|
List<DatanodeStorageInfo> existingBlockStorages =
|
||||||
new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
|
new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
|
||||||
|
// if expected type exists in source node already, local movement would be
|
||||||
|
// possible, so lets find such sources first.
|
||||||
|
Iterator<DatanodeStorageInfo> iterator = existingBlockStorages.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
DatanodeStorageInfo datanodeStorageInfo = iterator.next();
|
||||||
|
if (checkSourceAndTargetTypeExists(
|
||||||
|
datanodeStorageInfo.getDatanodeDescriptor(), existing,
|
||||||
|
expectedStorageTypes)) {
|
||||||
|
sourceWithStorageMap
|
||||||
|
.add(new StorageTypeNodePair(datanodeStorageInfo.getStorageType(),
|
||||||
|
datanodeStorageInfo.getDatanodeDescriptor()));
|
||||||
|
iterator.remove();
|
||||||
|
existing.remove(datanodeStorageInfo.getStorageType());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let's find sources for existing types left.
|
||||||
for (StorageType existingType : existing) {
|
for (StorageType existingType : existing) {
|
||||||
Iterator<DatanodeStorageInfo> iterator =
|
iterator = existingBlockStorages.iterator();
|
||||||
existingBlockStorages.iterator();
|
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
DatanodeStorageInfo datanodeStorageInfo = iterator.next();
|
DatanodeStorageInfo datanodeStorageInfo = iterator.next();
|
||||||
StorageType storageType = datanodeStorageInfo.getStorageType();
|
StorageType storageType = datanodeStorageInfo.getStorageType();
|
||||||
|
@ -317,7 +333,7 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
findTargetsForExpectedStorageTypes(expectedStorageTypes);
|
findTargetsForExpectedStorageTypes(expectedStorageTypes);
|
||||||
|
|
||||||
foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
|
foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
|
||||||
blockMovingInfos, blockInfo, existing, sourceWithStorageMap,
|
blockMovingInfos, blockInfo, sourceWithStorageMap,
|
||||||
expectedStorageTypes, locsForExpectedStorageTypes);
|
expectedStorageTypes, locsForExpectedStorageTypes);
|
||||||
}
|
}
|
||||||
return foundMatchingTargetNodesForBlock;
|
return foundMatchingTargetNodesForBlock;
|
||||||
|
@ -366,8 +382,6 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
* - list of block source and target node pair
|
* - list of block source and target node pair
|
||||||
* @param blockInfo
|
* @param blockInfo
|
||||||
* - Block
|
* - Block
|
||||||
* @param existing
|
|
||||||
* - Existing storage types of block
|
|
||||||
* @param sourceWithStorageList
|
* @param sourceWithStorageList
|
||||||
* - Source Datanode with storages list
|
* - Source Datanode with storages list
|
||||||
* @param expected
|
* @param expected
|
||||||
|
@ -379,7 +393,6 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
*/
|
*/
|
||||||
private boolean findSourceAndTargetToMove(
|
private boolean findSourceAndTargetToMove(
|
||||||
List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
|
List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
|
||||||
List<StorageType> existing,
|
|
||||||
List<StorageTypeNodePair> sourceWithStorageList,
|
List<StorageTypeNodePair> sourceWithStorageList,
|
||||||
List<StorageType> expected,
|
List<StorageType> expected,
|
||||||
StorageTypeNodeMap locsForExpectedStorageTypes) {
|
StorageTypeNodeMap locsForExpectedStorageTypes) {
|
||||||
|
@ -403,6 +416,7 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
targetNodes.add(chosenTarget.dn);
|
targetNodes.add(chosenTarget.dn);
|
||||||
targetStorageTypes.add(chosenTarget.storageType);
|
targetStorageTypes.add(chosenTarget.storageType);
|
||||||
chosenNodes.add(chosenTarget.dn);
|
chosenNodes.add(chosenTarget.dn);
|
||||||
|
expected.remove(chosenTarget.storageType);
|
||||||
// TODO: We can increment scheduled block count for this node?
|
// TODO: We can increment scheduled block count for this node?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -442,16 +456,20 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
targetNodes.add(chosenTarget.dn);
|
targetNodes.add(chosenTarget.dn);
|
||||||
targetStorageTypes.add(chosenTarget.storageType);
|
targetStorageTypes.add(chosenTarget.storageType);
|
||||||
chosenNodes.add(chosenTarget.dn);
|
chosenNodes.add(chosenTarget.dn);
|
||||||
|
expected.remove(chosenTarget.storageType);
|
||||||
// TODO: We can increment scheduled block count for this node?
|
// TODO: We can increment scheduled block count for this node?
|
||||||
} else {
|
} else {
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
"Failed to choose target datanode for the required"
|
"Failed to choose target datanode for the required"
|
||||||
+ " storage types {}, block:{}, existing storage type:{}",
|
+ " storage types {}, block:{}, existing storage type:{}",
|
||||||
expected, blockInfo, existingTypeNodePair.storageType);
|
expected, blockInfo, existingTypeNodePair.storageType);
|
||||||
foundMatchingTargetNodesForBlock = false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (expected.size() > 0) {
|
||||||
|
foundMatchingTargetNodesForBlock = false;
|
||||||
|
}
|
||||||
|
|
||||||
blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes,
|
blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes,
|
||||||
sourceStorageTypes, targetNodes, targetStorageTypes));
|
sourceStorageTypes, targetNodes, targetStorageTypes));
|
||||||
return foundMatchingTargetNodesForBlock;
|
return foundMatchingTargetNodesForBlock;
|
||||||
|
@ -616,6 +634,23 @@ public class StoragePolicySatisfier implements Runnable {
|
||||||
return max;
|
return max;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn,
|
||||||
|
List<StorageType> existing, List<StorageType> expectedStorageTypes) {
|
||||||
|
DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos();
|
||||||
|
boolean isExpectedTypeAvailable = false;
|
||||||
|
boolean isExistingTypeAvailable = false;
|
||||||
|
for (DatanodeStorageInfo dnInfo : allDNStorageInfos) {
|
||||||
|
StorageType storageType = dnInfo.getStorageType();
|
||||||
|
if (existing.contains(storageType)) {
|
||||||
|
isExistingTypeAvailable = true;
|
||||||
|
}
|
||||||
|
if (expectedStorageTypes.contains(storageType)) {
|
||||||
|
isExpectedTypeAvailable = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return isExistingTypeAvailable && isExpectedTypeAvailable;
|
||||||
|
}
|
||||||
|
|
||||||
private static class StorageTypeNodeMap {
|
private static class StorageTypeNodeMap {
|
||||||
private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
|
private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
|
||||||
new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
|
new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
|
||||||
|
|
|
@ -580,6 +580,77 @@ public class TestStoragePolicySatisfier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that moving block storage with in the same datanode. Let's say we
|
||||||
|
* have DN1[DISK,ARCHIVE], DN2[DISK, SSD], DN3[DISK,RAM_DISK] when
|
||||||
|
* storagepolicy set to ONE_SSD and request satisfyStoragePolicy, then block
|
||||||
|
* should move to DN2[SSD] successfully.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testBlockMoveInSameDatanodeWithONESSD() throws Exception {
|
||||||
|
StorageType[][] diskTypes =
|
||||||
|
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.DISK, StorageType.SSD},
|
||||||
|
{StorageType.DISK, StorageType.RAM_DISK}};
|
||||||
|
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
||||||
|
try {
|
||||||
|
hdfsCluster = startCluster(config, diskTypes, numOfDatanodes,
|
||||||
|
storagesPerDatanode, capacity);
|
||||||
|
dfs = hdfsCluster.getFileSystem();
|
||||||
|
writeContent(file);
|
||||||
|
|
||||||
|
// Change policy to ONE_SSD
|
||||||
|
dfs.setStoragePolicy(new Path(file), "ONE_SSD");
|
||||||
|
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||||
|
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||||
|
|
||||||
|
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
|
||||||
|
hdfsCluster.triggerHeartbeats();
|
||||||
|
waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
|
||||||
|
waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that moving block storage with in the same datanode and remote node.
|
||||||
|
* Let's say we have DN1[DISK,ARCHIVE], DN2[ARCHIVE, SSD], DN3[DISK,DISK],
|
||||||
|
* DN4[DISK,DISK] when storagepolicy set to WARM and request
|
||||||
|
* satisfyStoragePolicy, then block should move to DN1[ARCHIVE] and
|
||||||
|
* DN2[ARCHIVE] successfully.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testBlockMoveInSameAndRemoteDatanodesWithWARM() throws Exception {
|
||||||
|
StorageType[][] diskTypes =
|
||||||
|
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
|
||||||
|
{StorageType.ARCHIVE, StorageType.SSD},
|
||||||
|
{StorageType.DISK, StorageType.DISK},
|
||||||
|
{StorageType.DISK, StorageType.DISK}};
|
||||||
|
|
||||||
|
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
|
||||||
|
try {
|
||||||
|
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
|
||||||
|
storagesPerDatanode, capacity);
|
||||||
|
dfs = hdfsCluster.getFileSystem();
|
||||||
|
writeContent(file);
|
||||||
|
|
||||||
|
// Change policy to WARM
|
||||||
|
dfs.setStoragePolicy(new Path(file), "WARM");
|
||||||
|
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||||
|
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||||
|
|
||||||
|
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
|
||||||
|
hdfsCluster.triggerHeartbeats();
|
||||||
|
|
||||||
|
waitExpectedStorageType(file, StorageType.DISK, 1, 30000);
|
||||||
|
waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000);
|
||||||
|
} finally {
|
||||||
|
shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
|
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
|
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
|
||||||
|
|
Loading…
Reference in New Issue