HDFS-16182.numOfReplicas is given the wrong value in BlockPlacementPolicyDefault$chooseTarget can cause DataStreamer to fail with Heterogeneous Storage. (#3320)
This commit is contained in:
parent
c9f95b01ef
commit
5626734a36
|
@ -435,7 +435,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
* @param storageTypes storage type to be considered for target
|
* @param storageTypes storage type to be considered for target
|
||||||
* @return local node of writer (not chosen node)
|
* @return local node of writer (not chosen node)
|
||||||
*/
|
*/
|
||||||
private Node chooseTarget(int numOfReplicas,
|
private Node chooseTarget(final int numOfReplicas,
|
||||||
Node writer,
|
Node writer,
|
||||||
final Set<Node> excludedNodes,
|
final Set<Node> excludedNodes,
|
||||||
final long blocksize,
|
final long blocksize,
|
||||||
|
@ -469,7 +469,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
LOG.trace("storageTypes={}", storageTypes);
|
LOG.trace("storageTypes={}", storageTypes);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
|
if (requiredStorageTypes.size() == 0) {
|
||||||
throw new NotEnoughReplicasException(
|
throw new NotEnoughReplicasException(
|
||||||
"All required storage types are unavailable: "
|
"All required storage types are unavailable: "
|
||||||
+ " unavailableStorages=" + unavailableStorages
|
+ " unavailableStorages=" + unavailableStorages
|
||||||
|
@ -498,10 +498,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
for (DatanodeStorageInfo resultStorage : results) {
|
for (DatanodeStorageInfo resultStorage : results) {
|
||||||
addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes);
|
addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes);
|
||||||
}
|
}
|
||||||
// Set numOfReplicas, since it can get out of sync with the result list
|
// Set newNumOfReplicas, since it can get out of sync with the result list
|
||||||
// if the NotEnoughReplicasException was thrown in chooseRandom().
|
// if the NotEnoughReplicasException was thrown in chooseRandom().
|
||||||
numOfReplicas = totalReplicasExpected - results.size();
|
int newNumOfReplicas = totalReplicasExpected - results.size();
|
||||||
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
|
return chooseTarget(newNumOfReplicas, writer, oldExcludedNodes, blocksize,
|
||||||
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
|
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
|
||||||
newBlock, null);
|
newBlock, null);
|
||||||
}
|
}
|
||||||
|
@ -520,8 +520,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
|
addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
|
||||||
oldExcludedNodes);
|
oldExcludedNodes);
|
||||||
}
|
}
|
||||||
numOfReplicas = totalReplicasExpected - results.size();
|
int newNumOfReplicas = totalReplicasExpected - results.size();
|
||||||
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
|
return chooseTarget(newNumOfReplicas, writer, oldExcludedNodes, blocksize,
|
||||||
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
|
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
|
||||||
newBlock, null);
|
newBlock, null);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1337,6 +1337,59 @@ public class TestBlockStoragePolicy {
|
||||||
Assert.assertEquals(StorageType.DISK, targets[1].getStorageType());
|
Assert.assertEquals(StorageType.DISK, targets[1].getStorageType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consider a File with All_SSD storage policy.
|
||||||
|
* 1. Choose 3 DISK DNs for pipeline because SSD DNs no enough at
|
||||||
|
* the beginning.
|
||||||
|
* 2. One of DISK DNs fails And it need choose one new DN for existing.
|
||||||
|
* pipeline {@link DataStreamer addDatanode2ExistingPipeline()}.
|
||||||
|
* Make sure the number of target DNs are 3.
|
||||||
|
* see HDFS-16182.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAddDatanode2ExistingPipelineInSsd() throws Exception {
|
||||||
|
BlockStoragePolicy policy = POLICY_SUITE.getPolicy(ALLSSD);
|
||||||
|
|
||||||
|
final String[] racks = {"/d1/r1", "/d2/r2", "/d3/r3", "/d4/r4", "/d5/r5",
|
||||||
|
"/d6/r6", "/d7/r7"};
|
||||||
|
final String[] hosts = {"host1", "host2", "host3", "host4", "host5",
|
||||||
|
"host6", "host7"};
|
||||||
|
final StorageType[] disks = {StorageType.DISK, StorageType.DISK, StorageType.DISK};
|
||||||
|
|
||||||
|
final DatanodeStorageInfo[] diskStorages
|
||||||
|
= DFSTestUtil.createDatanodeStorageInfos(7, racks, hosts, disks);
|
||||||
|
final DatanodeDescriptor[] dataNodes
|
||||||
|
= DFSTestUtil.toDatanodeDescriptor(diskStorages);
|
||||||
|
for (int i = 0; i < dataNodes.length; i++) {
|
||||||
|
BlockManagerTestUtil.updateStorage(dataNodes[i],
|
||||||
|
new DatanodeStorage("ssd" + i + 1, DatanodeStorage.State.NORMAL,
|
||||||
|
StorageType.SSD));
|
||||||
|
}
|
||||||
|
|
||||||
|
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
||||||
|
new File(baseDir, "name").getPath());
|
||||||
|
DFSTestUtil.formatNameNode(conf);
|
||||||
|
NameNode namenode = new NameNode(conf);
|
||||||
|
|
||||||
|
final BlockManager bm = namenode.getNamesystem().getBlockManager();
|
||||||
|
BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy();
|
||||||
|
NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
|
||||||
|
for (DatanodeDescriptor datanode : dataNodes) {
|
||||||
|
cluster.add(datanode);
|
||||||
|
}
|
||||||
|
// chsenDs are DISK StorageType to simulate not enough SDD Storage
|
||||||
|
List<DatanodeStorageInfo> chsenDs = new ArrayList<>();
|
||||||
|
chsenDs.add(diskStorages[0]);
|
||||||
|
chsenDs.add(diskStorages[1]);
|
||||||
|
DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 1,
|
||||||
|
null, chsenDs, true,
|
||||||
|
new HashSet<Node>(), 0, policy, null);
|
||||||
|
Assert.assertEquals(3, targets.length);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetFileStoragePolicyAfterRestartNN() throws Exception {
|
public void testGetFileStoragePolicyAfterRestartNN() throws Exception {
|
||||||
//HDFS8219
|
//HDFS8219
|
||||||
|
|
Loading…
Reference in New Issue