HDFS-11309. [SPS]: chooseTargetTypeInSameNode should pass accurate block size to chooseStorage4Block while choosing target. Contributed by Uma Maheswara Rao G
This commit is contained in:
parent
681d2804c9
commit
f8fc96a66e
|
@ -403,24 +403,25 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
List<StorageType> sourceStorageTypes = new ArrayList<>();
|
||||
List<DatanodeInfo> targetNodes = new ArrayList<>();
|
||||
List<StorageType> targetStorageTypes = new ArrayList<>();
|
||||
List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
|
||||
List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
|
||||
|
||||
// Looping over all the source node locations and choose the target
|
||||
// storage within same node if possible. This is done separately to
|
||||
// avoid choosing a target which already has this block.
|
||||
for (int i = 0; i < sourceWithStorageList.size(); i++) {
|
||||
StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
|
||||
StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
|
||||
StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(blockInfo,
|
||||
existingTypeNodePair.dn, expected);
|
||||
if (chosenTarget != null) {
|
||||
sourceNodes.add(existingTypeNodePair.dn);
|
||||
sourceStorageTypes.add(existingTypeNodePair.storageType);
|
||||
targetNodes.add(chosenTarget.dn);
|
||||
targetStorageTypes.add(chosenTarget.storageType);
|
||||
chosenNodes.add(chosenTarget.dn);
|
||||
expected.remove(chosenTarget.storageType);
|
||||
// TODO: We can increment scheduled block count for this node?
|
||||
}
|
||||
// To avoid choosing this excludeNodes as targets later
|
||||
excludeNodes.add(existingTypeNodePair.dn);
|
||||
}
|
||||
|
||||
// Looping over all the source node locations. Choose a remote target
|
||||
|
@ -437,28 +438,28 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
.getNetworkTopology().isNodeGroupAware()) {
|
||||
chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
|
||||
expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
|
||||
chosenNodes);
|
||||
excludeNodes);
|
||||
}
|
||||
|
||||
// Then, match nodes on the same rack
|
||||
if (chosenTarget == null) {
|
||||
chosenTarget =
|
||||
chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
|
||||
Matcher.SAME_RACK, locsForExpectedStorageTypes, chosenNodes);
|
||||
Matcher.SAME_RACK, locsForExpectedStorageTypes, excludeNodes);
|
||||
}
|
||||
|
||||
if (chosenTarget == null) {
|
||||
chosenTarget =
|
||||
chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
|
||||
Matcher.ANY_OTHER, locsForExpectedStorageTypes, chosenNodes);
|
||||
Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes);
|
||||
}
|
||||
if (null != chosenTarget) {
|
||||
sourceNodes.add(existingTypeNodePair.dn);
|
||||
sourceStorageTypes.add(existingTypeNodePair.storageType);
|
||||
targetNodes.add(chosenTarget.dn);
|
||||
targetStorageTypes.add(chosenTarget.storageType);
|
||||
chosenNodes.add(chosenTarget.dn);
|
||||
expected.remove(chosenTarget.storageType);
|
||||
excludeNodes.add(chosenTarget.dn);
|
||||
// TODO: We can increment scheduled block count for this node?
|
||||
} else {
|
||||
LOG.warn(
|
||||
|
@ -554,14 +555,18 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
/**
|
||||
* Choose the target storage within same datanode if possible.
|
||||
*
|
||||
* @param source source datanode
|
||||
* @param targetTypes list of target storage types
|
||||
* @param block
|
||||
* - block info
|
||||
* @param source
|
||||
* - source datanode
|
||||
* @param targetTypes
|
||||
* - list of target storage types
|
||||
*/
|
||||
private StorageTypeNodePair chooseTargetTypeInSameNode(
|
||||
private StorageTypeNodePair chooseTargetTypeInSameNode(Block block,
|
||||
DatanodeDescriptor source, List<StorageType> targetTypes) {
|
||||
for (StorageType t : targetTypes) {
|
||||
DatanodeStorageInfo chooseStorage4Block =
|
||||
source.chooseStorage4Block(t, 0);
|
||||
source.chooseStorage4Block(t, block.getNumBytes());
|
||||
if (chooseStorage4Block != null) {
|
||||
return new StorageTypeNodePair(t, source);
|
||||
}
|
||||
|
@ -572,7 +577,7 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
private StorageTypeNodePair chooseTarget(Block block,
|
||||
DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher,
|
||||
StorageTypeNodeMap locsForExpectedStorageTypes,
|
||||
List<DatanodeDescriptor> chosenNodes) {
|
||||
List<DatanodeDescriptor> excludeNodes) {
|
||||
for (StorageType t : targetTypes) {
|
||||
List<DatanodeDescriptor> nodesWithStorages =
|
||||
locsForExpectedStorageTypes.getNodesWithStorages(t);
|
||||
|
@ -581,7 +586,7 @@ public class StoragePolicySatisfier implements Runnable {
|
|||
}
|
||||
Collections.shuffle(nodesWithStorages);
|
||||
for (DatanodeDescriptor target : nodesWithStorages) {
|
||||
if (!chosenNodes.contains(target) && matcher.match(
|
||||
if (!excludeNodes.contains(target) && matcher.match(
|
||||
blockManager.getDatanodeManager().getNetworkTopology(), source,
|
||||
target)) {
|
||||
if (null != target.chooseStorage4Block(t, block.getNumBytes())) {
|
||||
|
|
|
@ -18,12 +18,14 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
|
@ -57,6 +60,8 @@ import com.google.common.base.Supplier;
|
|||
* moved and finding its suggested target locations to move.
|
||||
*/
|
||||
public class TestStoragePolicySatisfier {
|
||||
private static final String ONE_SSD = "ONE_SSD";
|
||||
private static final String COLD = "COLD";
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
|
||||
private final Configuration config = new HdfsConfiguration();
|
||||
|
@ -93,7 +98,7 @@ public class TestStoragePolicySatisfier {
|
|||
try {
|
||||
createCluster();
|
||||
// Change policy to COLD
|
||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||
dfs.setStoragePolicy(new Path(file), COLD);
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
|
||||
|
@ -151,7 +156,7 @@ public class TestStoragePolicySatisfier {
|
|||
try {
|
||||
createCluster();
|
||||
// Change policy to ONE_SSD
|
||||
dfs.setStoragePolicy(new Path(file), "ONE_SSD");
|
||||
dfs.setStoragePolicy(new Path(file), ONE_SSD);
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
|
||||
|
@ -184,7 +189,7 @@ public class TestStoragePolicySatisfier {
|
|||
try {
|
||||
createCluster();
|
||||
// Change policy to ONE_SSD
|
||||
dfs.setStoragePolicy(new Path(file), "ONE_SSD");
|
||||
dfs.setStoragePolicy(new Path(file), ONE_SSD);
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
|
||||
|
@ -232,7 +237,7 @@ public class TestStoragePolicySatisfier {
|
|||
List<Long> blockCollectionIds = new ArrayList<>();
|
||||
// Change policy to ONE_SSD
|
||||
for (String fileName : files) {
|
||||
dfs.setStoragePolicy(new Path(fileName), "ONE_SSD");
|
||||
dfs.setStoragePolicy(new Path(fileName), ONE_SSD);
|
||||
INode inode = namesystem.getFSDirectory().getINode(fileName);
|
||||
blockCollectionIds.add(inode.getId());
|
||||
}
|
||||
|
@ -274,12 +279,12 @@ public class TestStoragePolicySatisfier {
|
|||
HdfsAdmin hdfsAdmin =
|
||||
new HdfsAdmin(FileSystem.getDefaultUri(config), config);
|
||||
// Change policy to COLD
|
||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||
dfs.setStoragePolicy(new Path(file), COLD);
|
||||
|
||||
StorageType[][] newtypes =
|
||||
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||
{StorageType.ARCHIVE, StorageType.ARCHIVE},
|
||||
{StorageType.ARCHIVE, StorageType.ARCHIVE}};
|
||||
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
|
||||
{StorageType.DISK, StorageType.ARCHIVE},
|
||||
{StorageType.DISK, StorageType.ARCHIVE}};
|
||||
startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
|
||||
storagesPerDatanode, capacity, hdfsCluster);
|
||||
|
||||
|
@ -314,7 +319,7 @@ public class TestStoragePolicySatisfier {
|
|||
writeContent(subFile2);
|
||||
|
||||
// Change policy to COLD
|
||||
dfs.setStoragePolicy(new Path(subDir), "ONE_SSD");
|
||||
dfs.setStoragePolicy(new Path(subDir), ONE_SSD);
|
||||
|
||||
StorageType[][] newtypes =
|
||||
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
|
||||
|
@ -418,7 +423,7 @@ public class TestStoragePolicySatisfier {
|
|||
try {
|
||||
createCluster();
|
||||
// Change policy to COLD
|
||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||
dfs.setStoragePolicy(new Path(file), COLD);
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
|
||||
|
@ -463,7 +468,7 @@ public class TestStoragePolicySatisfier {
|
|||
try {
|
||||
createCluster();
|
||||
// Change policy to COLD
|
||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||
dfs.setStoragePolicy(new Path(file), COLD);
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
|
||||
|
@ -533,7 +538,7 @@ public class TestStoragePolicySatisfier {
|
|||
final String file1 = createFileAndSimulateFavoredNodes(2);
|
||||
|
||||
// Change policy to COLD
|
||||
dfs.setStoragePolicy(new Path(file1), "COLD");
|
||||
dfs.setStoragePolicy(new Path(file1), COLD);
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file1);
|
||||
|
||||
|
@ -594,7 +599,7 @@ public class TestStoragePolicySatisfier {
|
|||
writeContent(file, (short) 5);
|
||||
|
||||
// Change policy to COLD
|
||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||
dfs.setStoragePolicy(new Path(file), COLD);
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
|
||||
|
@ -633,7 +638,7 @@ public class TestStoragePolicySatisfier {
|
|||
writeContent(file);
|
||||
|
||||
// Change policy to ONE_SSD
|
||||
dfs.setStoragePolicy(new Path(file), "ONE_SSD");
|
||||
dfs.setStoragePolicy(new Path(file), ONE_SSD);
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
|
||||
|
@ -688,6 +693,77 @@ public class TestStoragePolicySatisfier {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that movements should not be assigned when there is no space in
|
||||
* target DN.
|
||||
*/
|
||||
@Test(timeout = 300000)
|
||||
public void testChooseInSameDatanodeWithONESSDShouldNotChooseIfNoSpace()
|
||||
throws Exception {
|
||||
StorageType[][] diskTypes =
|
||||
new StorageType[][]{{StorageType.DISK, StorageType.DISK},
|
||||
{StorageType.DISK, StorageType.SSD},
|
||||
{StorageType.DISK, StorageType.DISK}};
|
||||
config.setLong("dfs.block.size", 2 * DEFAULT_BLOCK_SIZE);
|
||||
long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1);
|
||||
try {
|
||||
hdfsCluster = startCluster(config, diskTypes, numOfDatanodes,
|
||||
storagesPerDatanode, dnCapacity);
|
||||
dfs = hdfsCluster.getFileSystem();
|
||||
writeContent(file);
|
||||
|
||||
// Change policy to ONE_SSD
|
||||
dfs.setStoragePolicy(new Path(file), ONE_SSD);
|
||||
FSNamesystem namesystem = hdfsCluster.getNamesystem();
|
||||
INode inode = namesystem.getFSDirectory().getINode(file);
|
||||
Path filePath = new Path("/testChooseInSameDatanode");
|
||||
final FSDataOutputStream out =
|
||||
dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE);
|
||||
try {
|
||||
dfs.setStoragePolicy(filePath, ONE_SSD);
|
||||
// Try to fill up SSD part by writing content
|
||||
long remaining = dfs.getStatus().getRemaining() / (3 * 2);
|
||||
for (int i = 0; i < remaining; i++) {
|
||||
out.write(i);
|
||||
}
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
hdfsCluster.triggerHeartbeats();
|
||||
ArrayList<DataNode> dataNodes = hdfsCluster.getDataNodes();
|
||||
// Temporarily disable heart beats, so that we can assert whether any
|
||||
// items schedules for DNs even though DN's does not have space to write.
|
||||
// Disabling heart beats can keep scheduled items on DatanodeDescriptor
|
||||
// itself.
|
||||
for (DataNode dataNode : dataNodes) {
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
|
||||
}
|
||||
namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
|
||||
|
||||
// Wait for items to be processed
|
||||
waitForAttemptedItems(1, 30000);
|
||||
|
||||
// Make sure no items assigned for movements
|
||||
Set<DatanodeDescriptor> dns = hdfsCluster.getNamesystem()
|
||||
.getBlockManager().getDatanodeManager().getDatanodes();
|
||||
for (DatanodeDescriptor dd : dns) {
|
||||
assertNull(dd.getBlocksToMoveStorages());
|
||||
}
|
||||
|
||||
// Enable heart beats now
|
||||
for (DataNode dataNode : dataNodes) {
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, false);
|
||||
}
|
||||
hdfsCluster.triggerHeartbeats();
|
||||
|
||||
DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 3, 30000,
|
||||
dfs);
|
||||
DFSTestUtil.waitExpectedStorageType(file, StorageType.SSD, 0, 30000, dfs);
|
||||
} finally {
|
||||
shutdownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
|
||||
throws IOException {
|
||||
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
|
||||
|
@ -769,8 +845,8 @@ public class TestStoragePolicySatisfier {
|
|||
// write to DISK
|
||||
final FSDataOutputStream out = dfs.create(new Path(fileName),
|
||||
replicatonFactor);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
out.writeChars("t");
|
||||
for (int i = 0; i < 1024; i++) {
|
||||
out.write(i);
|
||||
}
|
||||
out.close();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue