HDFS-9314. Improve BlockPlacementPolicyDefault's picking of excess replicas. (Xiao Chen via mingma)
This commit is contained in:
parent
1777608fa0
commit
0e54b164a8
|
@ -1676,6 +1676,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-7988. Replace usage of ExactSizeInputStream with LimitInputStream.
|
||||
(Walter Su via wheat9)
|
||||
|
||||
HDFS-9314. Improve BlockPlacementPolicyDefault's picking of excess
|
||||
replicas. (Xiao Chen via mingma)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -916,7 +916,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
public DatanodeStorageInfo chooseReplicaToDelete(
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
final List<StorageType> excessTypes) {
|
||||
final List<StorageType> excessTypes,
|
||||
Map<String, List<DatanodeStorageInfo>> rackMap) {
|
||||
long oldestHeartbeat =
|
||||
monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
||||
DatanodeStorageInfo oldestHeartbeatStorage = null;
|
||||
|
@ -926,7 +927,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
// Pick the node with the oldest heartbeat or with the least free space,
|
||||
// if all hearbeats are within the tolerable heartbeat interval
|
||||
for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanOne,
|
||||
exactlyOne)) {
|
||||
exactlyOne, rackMap)) {
|
||||
if (!excessTypes.contains(storage.getStorageType())) {
|
||||
continue;
|
||||
}
|
||||
|
@ -991,7 +992,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
moreThanOne, exactlyOne, excessTypes)) {
|
||||
cur = delNodeHintStorage;
|
||||
} else { // regular excessive replica removal
|
||||
cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes);
|
||||
cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes,
|
||||
rackMap);
|
||||
}
|
||||
firstOne = false;
|
||||
if (cur == null) {
|
||||
|
@ -1044,16 +1046,34 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
splitNodesWithRack(locs, rackMap, moreThanOne, exactlyOne);
|
||||
return notReduceNumOfGroups(moreThanOne, source, target);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick up replica node set for deleting replica as over-replicated.
|
||||
* First set contains replica nodes on rack with more than one
|
||||
* replica while second set contains remaining replica nodes.
|
||||
* So pick up first set if not empty. If first is empty, then pick second.
|
||||
* If only 1 rack, pick all. If 2 racks, pick all that have more than
|
||||
* 1 replicas on the same rack; if no such replicas, pick all.
|
||||
* If 3 or more racks, pick all.
|
||||
*/
|
||||
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne) {
|
||||
return moreThanOne.isEmpty() ? exactlyOne : moreThanOne;
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
Map<String, List<DatanodeStorageInfo>> rackMap) {
|
||||
Collection<DatanodeStorageInfo> ret = new ArrayList<>();
|
||||
if (rackMap.size() == 2) {
|
||||
for (List<DatanodeStorageInfo> dsi : rackMap.values()) {
|
||||
if (dsi.size() >= 2) {
|
||||
ret.addAll(dsi);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ret.isEmpty()) {
|
||||
// Return all replicas if rackMap.size() != 2
|
||||
// or rackMap.size() == 2 but no shared replicas on any rack
|
||||
ret.addAll(moreThanOne);
|
||||
ret.addAll(exactlyOne);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -169,4 +169,12 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
|
|||
racks.add(dn.getNetworkLocation());
|
||||
return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
Map<String, List<DatanodeStorageInfo>> rackMap) {
|
||||
return moreThanOne.isEmpty() ? exactlyOne : moreThanOne;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -332,7 +332,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
|||
@Override
|
||||
public Collection<DatanodeStorageInfo> pickupReplicaSet(
|
||||
Collection<DatanodeStorageInfo> first,
|
||||
Collection<DatanodeStorageInfo> second) {
|
||||
Collection<DatanodeStorageInfo> second,
|
||||
Map<String, List<DatanodeStorageInfo>> rackMap) {
|
||||
// If no replica within same rack, return directly.
|
||||
if (first.isEmpty()) {
|
||||
return second;
|
||||
|
|
|
@ -206,18 +206,18 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
|
|||
* shareRackNotUDSet} >= 3. Removing a node from shareUDNotRackSet
|
||||
* will reduce the # of racks by 1 and won't change # of upgrade
|
||||
* domains.
|
||||
* Note that this is different from BlockPlacementPolicyDefault which
|
||||
* will keep the # of racks after deletion. With upgrade domain policy,
|
||||
* given # of racks is still >= 2 after deletion, the data availability
|
||||
* model remains the same as BlockPlacementPolicyDefault (only supports
|
||||
* one rack failure).
|
||||
* Note that this is similar to BlockPlacementPolicyDefault which
|
||||
* will at most reduce the # of racks by 1, and never reduce it to < 2.
|
||||
* With upgrade domain policy, given # of racks is still >= 2 after
|
||||
* deletion, the data availability model remains the same as
|
||||
* BlockPlacementPolicyDefault (only supports one rack failure).
|
||||
* For example, assume we have 4 replicas: d1(rack1, ud1),
|
||||
* d2(rack2, ud1), d3(rack3, ud3), d4(rack3, ud4). Thus we have
|
||||
* shareUDNotRackSet: {d1, d2} and shareRackNotUDSet: {d3, d4}.
|
||||
* With upgrade domain policy, the remaining replicas after deletion
|
||||
* are {d1(or d2), d3, d4} which has 2 racks.
|
||||
* With BlockPlacementPolicyDefault policy, the remaining replicas
|
||||
* after deletion are {d1, d2, d3(or d4)} which has 3 racks.
|
||||
* With BlockPlacementPolicyDefault policy, any of the 4 with the worst
|
||||
* condition will be deleted, which in worst case has 2 racks remain.
|
||||
* 3. shareUDNotRackSet isn't empty and shareRackNotUDSet is empty. This
|
||||
* implies all replicas are on unique racks. Removing a node from
|
||||
* shareUDNotRackSet will reduce # of racks (no different from
|
||||
|
@ -244,7 +244,8 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
|
|||
@Override
|
||||
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne) {
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
Map<String, List<DatanodeStorageInfo>> rackMap) {
|
||||
// shareUDSet includes DatanodeStorageInfo that share same upgrade
|
||||
// domain with another DatanodeStorageInfo.
|
||||
Collection<DatanodeStorageInfo> all = combine(moreThanOne, exactlyOne);
|
||||
|
@ -255,7 +256,7 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
|
|||
List<DatanodeStorageInfo> shareRackAndUDSet = new ArrayList<>();
|
||||
if (shareUDSet.size() == 0) {
|
||||
// All upgrade domains are unique, use the parent set.
|
||||
return super.pickupReplicaSet(moreThanOne, exactlyOne);
|
||||
return super.pickupReplicaSet(moreThanOne, exactlyOne, rackMap);
|
||||
} else if (moreThanOne != null) {
|
||||
for (DatanodeStorageInfo storage : shareUDSet) {
|
||||
if (moreThanOne.contains(storage)) {
|
||||
|
|
|
@ -972,22 +972,22 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
// test returning null
|
||||
excessTypes.add(StorageType.SSD);
|
||||
assertNull(((BlockPlacementPolicyDefault) replicator)
|
||||
.chooseReplicaToDelete(first, second, excessTypes));
|
||||
.chooseReplicaToDelete(first, second, excessTypes, rackMap));
|
||||
}
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
|
||||
.chooseReplicaToDelete(first, second, excessTypes);
|
||||
// Within first set, storages[1] with less free space
|
||||
assertEquals(chosen, storages[1]);
|
||||
.chooseReplicaToDelete(first, second, excessTypes, rackMap);
|
||||
// Within all storages, storages[5] with least free space
|
||||
assertEquals(chosen, storages[5]);
|
||||
|
||||
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
|
||||
assertEquals(0, first.size());
|
||||
assertEquals(3, second.size());
|
||||
// Within second set, storages[5] with less free space
|
||||
assertEquals(2, first.size());
|
||||
assertEquals(1, second.size());
|
||||
// Within first set, storages[1] with less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||
first, second, excessTypes);
|
||||
assertEquals(chosen, storages[5]);
|
||||
first, second, excessTypes, rackMap);
|
||||
assertEquals(chosen, storages[1]);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1032,17 +1032,15 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
excessTypes, storages[3].getDatanodeDescriptor(), null);
|
||||
assertTrue(excessReplicas.contains(excessStorage));
|
||||
|
||||
|
||||
// The block was initially created on excessSSD(rack r1),
|
||||
// storages[4](rack r3) and storages[5](rack r3) with
|
||||
// ONESSD_STORAGE_POLICY_NAME storage policy.
|
||||
// ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 3.
|
||||
// Right after balancer moves the block from storages[5] to
|
||||
// storages[3](rack r2), the application changes the storage policy from
|
||||
// ONESSD_STORAGE_POLICY_NAME to HOT_STORAGE_POLICY_ID. In this case,
|
||||
// no replica can be chosen as the excessive replica as
|
||||
// chooseReplicasToDelete only considers storages[4] and storages[5] that
|
||||
// are the same rack. But neither's storage type is SSD.
|
||||
// TODO BlockPlacementPolicyDefault should be able to delete excessSSD.
|
||||
// we should be able to delete excessSSD since the remaining
|
||||
// storages ({storages[3]}, {storages[4], storages[5]})
|
||||
// are on different racks (r2, r3).
|
||||
nonExcess.clear();
|
||||
nonExcess.add(excessSSD);
|
||||
nonExcess.add(storages[3]);
|
||||
|
@ -1053,7 +1051,59 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
|||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[3].getDatanodeDescriptor(),
|
||||
storages[5].getDatanodeDescriptor());
|
||||
assertTrue(excessReplicas.size() == 0);
|
||||
assertEquals(1, excessReplicas.size());
|
||||
assertTrue(excessReplicas.contains(excessSSD));
|
||||
|
||||
// Similar to above, but after policy change and before deletion,
|
||||
// the replicas are located on excessSSD(rack r1), storages[1](rack r1),
|
||||
// storages[2](rack r2) and storages[3](rack r2). Replication factor = 3.
|
||||
// In this case, we should be able to delete excessSSD since the remaining
|
||||
// storages ({storages[1]} , {storages[2], storages[3]})
|
||||
// are on different racks (r1, r2).
|
||||
nonExcess.clear();
|
||||
nonExcess.add(excessSSD);
|
||||
nonExcess.add(storages[1]);
|
||||
nonExcess.add(storages[2]);
|
||||
nonExcess.add(storages[3]);
|
||||
excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[1].getDatanodeDescriptor(),
|
||||
storages[3].getDatanodeDescriptor());
|
||||
assertEquals(1, excessReplicas.size());
|
||||
assertTrue(excessReplicas.contains(excessSSD));
|
||||
|
||||
// Similar to above, but after policy change and before deletion,
|
||||
// the replicas are located on excessSSD(rack r1), storages[2](rack r2)
|
||||
// Replication factor = 1. We should be able to delete excessSSD.
|
||||
nonExcess.clear();
|
||||
nonExcess.add(excessSSD);
|
||||
nonExcess.add(storages[2]);
|
||||
excessTypes = storagePolicy.chooseExcess((short) 1,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 1,
|
||||
excessTypes, storages[2].getDatanodeDescriptor(), null);
|
||||
assertEquals(1, excessReplicas.size());
|
||||
assertTrue(excessReplicas.contains(excessSSD));
|
||||
|
||||
// The block was initially created on excessSSD(rack r1),
|
||||
// storages[4](rack r3) and storages[5](rack r3) with
|
||||
// ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 2.
|
||||
// In this case, no replica can be chosen as the excessive replica by
|
||||
// chooseReplicasToDelete because if the SSD storage is deleted,
|
||||
// the remaining storages[4] and storages[5] are the same rack (r3),
|
||||
// violating block placement policy (i.e. the number of racks >= 2).
|
||||
// TODO BlockPlacementPolicyDefault should be able to rebalance the replicas
|
||||
// and then delete excessSSD.
|
||||
nonExcess.clear();
|
||||
nonExcess.add(excessSSD);
|
||||
nonExcess.add(storages[4]);
|
||||
nonExcess.add(storages[5]);
|
||||
excessTypes = storagePolicy.chooseExcess((short) 2,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 2,
|
||||
excessTypes, null, null);
|
||||
assertEquals(0, excessReplicas.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -544,7 +544,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
List<StorageType> excessTypes = new ArrayList<>();
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
|
||||
.chooseReplicaToDelete(first, second, excessTypes);
|
||||
.chooseReplicaToDelete(first, second, excessTypes, rackMap);
|
||||
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
|
||||
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
|
||||
// but dataNodes[1] is chosen as less free space
|
||||
|
@ -557,7 +557,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
// as less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||
first, second, excessTypes);
|
||||
first, second, excessTypes, rackMap);
|
||||
assertEquals(chosen, storages[2]);
|
||||
|
||||
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
|
||||
|
@ -566,7 +566,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
|||
// Within second set, dataNodes[5] with less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||
first, second, excessTypes);
|
||||
first, second, excessTypes, rackMap);
|
||||
assertEquals(chosen, storages[5]);
|
||||
}
|
||||
|
||||
|
|
|
@ -389,6 +389,38 @@ public class TestReplicationPolicyWithUpgradeDomain
|
|||
assertTrue(excessReplicas.size() == 2);
|
||||
assertTrue(excessReplicas.contains(storages[0]));
|
||||
assertTrue(excessReplicas.contains(excessStorage));
|
||||
|
||||
// Test SSD related deletion. With different rack settings here, but
|
||||
// similar to {@link TestReplicationPolicy#testChooseReplicasToDelete}.
|
||||
// The block was initially created on excessSSD(rack r1, UD 4),
|
||||
// storages[7](rack r3, UD 2) and storages[8](rack r3, UD 3) with
|
||||
// ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 3.
|
||||
// Right after balancer moves the block from storages[7] to
|
||||
// storages[3](rack r2, UD 1), the application changes the storage policy
|
||||
// from ONESSD_STORAGE_POLICY_NAME to HOT_STORAGE_POLICY_ID. In this case,
|
||||
// we should be able to delete excessSSD since the remaining
|
||||
// storages ({storages[3]}, {storages[7], storages[8]})
|
||||
// are on different racks (r2, r3) and different UDs (1, 2, 3).
|
||||
DatanodeStorageInfo excessSSD = DFSTestUtil.createDatanodeStorageInfo(
|
||||
"Storage-excess-SSD-ID", "localhost",
|
||||
storages[0].getDatanodeDescriptor().getNetworkLocation(), "foo.com",
|
||||
StorageType.SSD, null);
|
||||
DatanodeStorageInfo[] ssds = { excessSSD };
|
||||
DatanodeDescriptor ssdNodes[] = DFSTestUtil.toDatanodeDescriptor(ssds);
|
||||
ssdNodes[0].setUpgradeDomain(Integer.toString(4));
|
||||
|
||||
nonExcess.clear();
|
||||
nonExcess.add(excessSSD);
|
||||
nonExcess.add(storages[3]);
|
||||
nonExcess.add(storages[7]);
|
||||
nonExcess.add(storages[8]);
|
||||
excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[3].getDatanodeDescriptor(),
|
||||
storages[7].getDatanodeDescriptor());
|
||||
assertEquals(1, excessReplicas.size());
|
||||
assertTrue(excessReplicas.contains(excessSSD));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.io.PrintWriter;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
|
@ -633,7 +634,8 @@ public class TestDNFencing {
|
|||
public DatanodeStorageInfo chooseReplicaToDelete(
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
List<StorageType> excessTypes) {
|
||||
List<StorageType> excessTypes,
|
||||
Map<String, List<DatanodeStorageInfo>> rackMap) {
|
||||
|
||||
Collection<DatanodeStorageInfo> chooseFrom = !moreThanOne.isEmpty() ?
|
||||
moreThanOne : exactlyOne;
|
||||
|
|
Loading…
Reference in New Issue