HDFS-9314. Improve BlockPlacementPolicyDefault's picking of excess replicas. Contributed by Xiao Chen.

Change-Id: Idf99293085531165239369155c039b55db0eed83

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
Zhe Zhang 2015-12-04 09:49:43 -08:00
parent 089169e4e5
commit 4c631e8c73
6 changed files with 114 additions and 39 deletions

View File

@ -13,6 +13,10 @@ Release 2.6.4 - UNRELEASED
HDFS-9313. Possible NullPointerException in BlockManager if no excess HDFS-9313. Possible NullPointerException in BlockManager if no excess
replica can be chosen. (mingma) replica can be chosen. (mingma)
HDFS-9314. Improve BlockPlacementPolicyDefault's picking of excess
replicas. (Xiao Chen via zhz)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -879,8 +879,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
/** /**
* Decide whether deleting the specified replica of the block still makes * Decide whether deleting the specified replica of the block still makes
* the block conform to the configured block placement policy. * the block conform to the configured block placement policy.
* @param replicationFactor The required number of replicas for this block * @param moreThanOne The replica locations of this block that are present
* @param moreThanone The replica locations of this block that are present
* on more than one unique racks. * on more than one unique racks.
* @param exactlyOne Replica locations of this block that are present * @param exactlyOne Replica locations of this block that are present
* on exactly one unique racks. * on exactly one unique racks.
@ -890,9 +889,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* @return the replica that is the best candidate for deletion * @return the replica that is the best candidate for deletion
*/ */
@VisibleForTesting @VisibleForTesting
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, public DatanodeStorageInfo chooseReplicaToDelete(
Collection<DatanodeStorageInfo> moreThanone, Collection<DatanodeStorageInfo> exactlyOne, Collection<DatanodeStorageInfo> moreThanOne,
final List<StorageType> excessTypes) { Collection<DatanodeStorageInfo> exactlyOne,
final List<StorageType> excessTypes,
Map<String, List<DatanodeStorageInfo>> rackMap) {
long oldestHeartbeat = long oldestHeartbeat =
now() - heartbeatInterval * tolerateHeartbeatMultiplier; now() - heartbeatInterval * tolerateHeartbeatMultiplier;
DatanodeStorageInfo oldestHeartbeatStorage = null; DatanodeStorageInfo oldestHeartbeatStorage = null;
@ -901,7 +902,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// Pick the node with the oldest heartbeat or with the least free space, // Pick the node with the oldest heartbeat or with the least free space,
// if all hearbeats are within the tolerable heartbeat interval // if all hearbeats are within the tolerable heartbeat interval
for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) { for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanOne,
exactlyOne, rackMap)) {
if (!excessTypes.contains(storage.getStorageType())) { if (!excessTypes.contains(storage.getStorageType())) {
continue; continue;
} }
@ -966,9 +968,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
moreThanOne, excessTypes)) { moreThanOne, excessTypes)) {
cur = delNodeHintStorage; cur = delNodeHintStorage;
} else { // regular excessive replica removal } else { // regular excessive replica removal
cur = cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes,
chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne, rackMap);
excessTypes);
} }
firstOne = false; firstOne = false;
if (cur == null) { if (cur == null) {
@ -1010,12 +1011,29 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* Pick up replica node set for deleting replica as over-replicated. * Pick up replica node set for deleting replica as over-replicated.
* First set contains replica nodes on rack with more than one * First set contains replica nodes on rack with more than one
* replica while second set contains remaining replica nodes. * replica while second set contains remaining replica nodes.
* So pick up first set if not empty. If first is empty, then pick second. * If only 1 rack, pick all. If 2 racks, pick all that have more than
* 1 replicas on the same rack; if no such replicas, pick all.
* If 3 or more racks, pick all.
*/ */
protected Collection<DatanodeStorageInfo> pickupReplicaSet( protected Collection<DatanodeStorageInfo> pickupReplicaSet(
Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> moreThanOne,
Collection<DatanodeStorageInfo> second) { Collection<DatanodeStorageInfo> exactlyOne,
return first.isEmpty() ? second : first; Map<String, List<DatanodeStorageInfo>> rackMap) {
Collection<DatanodeStorageInfo> ret = new ArrayList<>();
if (rackMap.size() == 2) {
for (List<DatanodeStorageInfo> dsi : rackMap.values()) {
if (dsi.size() >= 2) {
ret.addAll(dsi);
}
}
}
if (ret.isEmpty()) {
// Return all replicas if rackMap.size() != 2
// or rackMap.size() == 2 but no shared replicas on any rack
ret.addAll(moreThanOne);
ret.addAll(exactlyOne);
}
return ret;
} }
@VisibleForTesting @VisibleForTesting

View File

@ -304,7 +304,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
@Override @Override
public Collection<DatanodeStorageInfo> pickupReplicaSet( public Collection<DatanodeStorageInfo> pickupReplicaSet(
Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> first,
Collection<DatanodeStorageInfo> second) { Collection<DatanodeStorageInfo> second,
Map<String, List<DatanodeStorageInfo>> rackMap) {
// If no replica within same rack, return directly. // If no replica within same rack, return directly.
if (first.isEmpty()) { if (first.isEmpty()) {
return second; return second;

View File

@ -1029,22 +1029,22 @@ public class TestReplicationPolicy {
// test returning null // test returning null
excessTypes.add(StorageType.SSD); excessTypes.add(StorageType.SSD);
assertNull(((BlockPlacementPolicyDefault) replicator) assertNull(((BlockPlacementPolicyDefault) replicator)
.chooseReplicaToDelete((short) 3, first, second, excessTypes)); .chooseReplicaToDelete(first, second, excessTypes, rackMap));
} }
excessTypes.add(StorageType.DEFAULT); excessTypes.add(StorageType.DEFAULT);
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
.chooseReplicaToDelete((short) 3, first, second, excessTypes); .chooseReplicaToDelete(first, second, excessTypes, rackMap);
// Within first set, storages[1] with less free space // Within all storages, storages[5] with least free space
assertEquals(chosen, storages[1]); assertEquals(chosen, storages[5]);
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
assertEquals(0, first.size()); assertEquals(2, first.size());
assertEquals(3, second.size()); assertEquals(1, second.size());
// Within second set, storages[5] with less free space // Within first set, storages[1] with less free space
excessTypes.add(StorageType.DEFAULT); excessTypes.add(StorageType.DEFAULT);
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
(short)2, first, second, excessTypes); first, second, excessTypes, rackMap);
assertEquals(chosen, storages[5]); assertEquals(chosen, storages[1]);
} }
@Test @Test
@ -1089,17 +1089,15 @@ public class TestReplicationPolicy {
excessTypes, storages[3].getDatanodeDescriptor(), null); excessTypes, storages[3].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.contains(excessStorage)); assertTrue(excessReplicas.contains(excessStorage));
// The block was initially created on excessSSD(rack r1), // The block was initially created on excessSSD(rack r1),
// storages[4](rack r3) and storages[5](rack r3) with // storages[4](rack r3) and storages[5](rack r3) with
// ONESSD_STORAGE_POLICY_NAME storage policy. // ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 3.
// Right after balancer moves the block from storages[5] to // Right after balancer moves the block from storages[5] to
// storages[3](rack r2), the application changes the storage policy from // storages[3](rack r2), the application changes the storage policy from
// ONESSD_STORAGE_POLICY_NAME to HOT_STORAGE_POLICY_ID. In this case, // ONESSD_STORAGE_POLICY_NAME to HOT_STORAGE_POLICY_ID. In this case,
// no replica can be chosen as the excessive replica as // we should be able to delete excessSSD since the remaining
// chooseReplicasToDelete only considers storages[4] and storages[5] that // storages ({storages[3]}, {storages[4], storages[5]})
// are the same rack. But neither's storage type is SSD. // are on different racks (r2, r3).
// TODO BlockPlacementPolicyDefault should be able to delete excessSSD.
nonExcess.clear(); nonExcess.clear();
nonExcess.add(excessSSD); nonExcess.add(excessSSD);
nonExcess.add(storages[3]); nonExcess.add(storages[3]);
@ -1110,7 +1108,59 @@ public class TestReplicationPolicy {
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), excessTypes, storages[3].getDatanodeDescriptor(),
storages[5].getDatanodeDescriptor()); storages[5].getDatanodeDescriptor());
assertTrue(excessReplicas.size() == 0); assertEquals(1, excessReplicas.size());
assertTrue(excessReplicas.contains(excessSSD));
// Similar to above, but after policy change and before deletion,
// the replicas are located on excessSSD(rack r1), storages[1](rack r1),
// storages[2](rack r2) and storages[3](rack r2). Replication factor = 3.
// In this case, we should be able to delete excessSSD since the remaining
// storages ({storages[1]} , {storages[2], storages[3]})
// are on different racks (r1, r2).
nonExcess.clear();
nonExcess.add(excessSSD);
nonExcess.add(storages[1]);
nonExcess.add(storages[2]);
nonExcess.add(storages[3]);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
excessTypes, storages[1].getDatanodeDescriptor(),
storages[3].getDatanodeDescriptor());
assertEquals(1, excessReplicas.size());
assertTrue(excessReplicas.contains(excessSSD));
// Similar to above, but after policy change and before deletion,
// the replicas are located on excessSSD(rack r1), storages[2](rack r2)
// Replication factor = 1. We should be able to delete excessSSD.
nonExcess.clear();
nonExcess.add(excessSSD);
nonExcess.add(storages[2]);
excessTypes = storagePolicy.chooseExcess((short) 1,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 1,
excessTypes, storages[2].getDatanodeDescriptor(), null);
assertEquals(1, excessReplicas.size());
assertTrue(excessReplicas.contains(excessSSD));
// The block was initially created on excessSSD(rack r1),
// storages[4](rack r3) and storages[5](rack r3) with
// ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 2.
// In this case, no replica can be chosen as the excessive replica by
// chooseReplicasToDelete because if the SSD storage is deleted,
// the remaining storages[4] and storages[5] are the same rack (r3),
// violating block placement policy (i.e. the number of racks >= 2).
// TODO BlockPlacementPolicyDefault should be able to rebalance the replicas
// and then delete excessSSD.
nonExcess.clear();
nonExcess.add(excessSSD);
nonExcess.add(storages[4]);
nonExcess.add(storages[5]);
excessTypes = storagePolicy.chooseExcess((short) 2,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 2,
excessTypes, null, null);
assertEquals(0, excessReplicas.size());
} }
@Test @Test

View File

@ -616,7 +616,7 @@ public class TestReplicationPolicyWithNodeGroup {
List<StorageType> excessTypes = new ArrayList<StorageType>(); List<StorageType> excessTypes = new ArrayList<StorageType>();
excessTypes.add(StorageType.DEFAULT); excessTypes.add(StorageType.DEFAULT);
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
.chooseReplicaToDelete((short) 3, first, second, excessTypes); .chooseReplicaToDelete(first, second, excessTypes, rackMap);
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
// dataNodes[0] and dataNodes[1] are in the same nodegroup, // dataNodes[0] and dataNodes[1] are in the same nodegroup,
// but dataNodes[1] is chosen as less free space // but dataNodes[1] is chosen as less free space
@ -629,7 +629,7 @@ public class TestReplicationPolicyWithNodeGroup {
// as less free space // as less free space
excessTypes.add(StorageType.DEFAULT); excessTypes.add(StorageType.DEFAULT);
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
(short) 2, first, second, excessTypes); first, second, excessTypes, rackMap);
assertEquals(chosen, storages[2]); assertEquals(chosen, storages[2]);
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
@ -638,7 +638,7 @@ public class TestReplicationPolicyWithNodeGroup {
// Within second set, dataNodes[5] with less free space // Within second set, dataNodes[5] with less free space
excessTypes.add(StorageType.DEFAULT); excessTypes.add(StorageType.DEFAULT);
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
(short) 1, first, second, excessTypes); first, second, excessTypes, rackMap);
assertEquals(chosen, storages[5]); assertEquals(chosen, storages[5]);
} }

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -619,12 +620,13 @@ public class TestDNFencing {
} }
@Override @Override
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, public DatanodeStorageInfo chooseReplicaToDelete(
Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> second, Collection<DatanodeStorageInfo> moreThanOne,
List<StorageType> excessTypes) { Collection<DatanodeStorageInfo> exactlyOne,
List<StorageType> excessTypes,
Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second; Map<String, List<DatanodeStorageInfo>> rackMap) {
Collection<DatanodeStorageInfo> chooseFrom = !moreThanOne.isEmpty() ?
moreThanOne : exactlyOne;
List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom); List<DatanodeStorageInfo> l = Lists.newArrayList(chooseFrom);
return l.get(DFSUtil.getRandom().nextInt(l.size())); return l.get(DFSUtil.getRandom().nextInt(l.size()));
} }