From 85d04dc46494c5b627920bbc021f0515af8f753e Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Tue, 24 Nov 2015 10:30:24 -0800 Subject: [PATCH] HDFS-9314. Improve BlockPlacementPolicyDefault's picking of excess replicas. (Xiao Chen via mingma) (cherry picked from commit 0e54b164a8d8acf09aca8712116bf7a554cb4846) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../BlockPlacementPolicyDefault.java | 32 ++++++-- ...BlockPlacementPolicyRackFaultTolerant.java | 8 ++ .../BlockPlacementPolicyWithNodeGroup.java | 3 +- ...BlockPlacementPolicyWithUpgradeDomain.java | 19 +++-- .../TestReplicationPolicy.java | 82 +++++++++++++++---- .../TestReplicationPolicyWithNodeGroup.java | 6 +- ...estReplicationPolicyWithUpgradeDomain.java | 32 ++++++++ .../server/namenode/ha/TestDNFencing.java | 4 +- 9 files changed, 153 insertions(+), 36 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 421240bd637..5cacca31765 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -811,6 +811,9 @@ Release 2.8.0 - UNRELEASED HDFS-7988. Replace usage of ExactSizeInputStream with LimitInputStream. (Walter Su via wheat9) + HDFS-9314. Improve BlockPlacementPolicyDefault's picking of excess + replicas. (Xiao Chen via mingma) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 13b17e36208..08e7851fa52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -916,7 +916,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { public DatanodeStorageInfo chooseReplicaToDelete( Collection moreThanOne, Collection exactlyOne, - final List excessTypes) { + final List excessTypes, + Map> rackMap) { long oldestHeartbeat = monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier; DatanodeStorageInfo oldestHeartbeatStorage = null; @@ -926,7 +927,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanOne, - exactlyOne)) { + exactlyOne, rackMap)) { if (!excessTypes.contains(storage.getStorageType())) { continue; } @@ -991,7 +992,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { moreThanOne, exactlyOne, excessTypes)) { cur = delNodeHintStorage; } else { // regular excessive replica removal - cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes); + cur = chooseReplicaToDelete(moreThanOne, exactlyOne, excessTypes, + rackMap); } firstOne = false; if (cur == null) { @@ -1044,16 +1046,34 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { splitNodesWithRack(locs, rackMap, moreThanOne, exactlyOne); return notReduceNumOfGroups(moreThanOne, source, target); } + /** * Pick up replica node set for deleting replica as over-replicated. * First set contains replica nodes on rack with more than one * replica while second set contains remaining replica nodes. - * So pick up first set if not empty. If first is empty, then pick second. + * If only 1 rack, pick all. If 2 racks, pick all that have more than + * 1 replicas on the same rack; if no such replicas, pick all. + * If 3 or more racks, pick all. */ protected Collection pickupReplicaSet( Collection moreThanOne, - Collection exactlyOne) { - return moreThanOne.isEmpty() ? exactlyOne : moreThanOne; + Collection exactlyOne, + Map> rackMap) { + Collection ret = new ArrayList<>(); + if (rackMap.size() == 2) { + for (List dsi : rackMap.values()) { + if (dsi.size() >= 2) { + ret.addAll(dsi); + } + } + } + if (ret.isEmpty()) { + // Return all replicas if rackMap.size() != 2 + // or rackMap.size() == 2 but no shared replicas on any rack + ret.addAll(moreThanOne); + ret.addAll(exactlyOne); + } + return ret; } @VisibleForTesting diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java index 8ca0d2b0a16..c803b97f63d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java @@ -169,4 +169,12 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD racks.add(dn.getNetworkLocation()); return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas); } + + @Override + protected Collection pickupReplicaSet( + Collection moreThanOne, + Collection exactlyOne, + Map> rackMap) { + return moreThanOne.isEmpty() ? exactlyOne : moreThanOne; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index 0481e07a9ba..ad1799e7977 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -334,7 +334,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau @Override public Collection pickupReplicaSet( Collection first, - Collection second) { + Collection second, + Map> rackMap) { // If no replica within same rack, return directly. if (first.isEmpty()) { return second; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java index 8d6b13c8cbd..37fb9714858 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithUpgradeDomain.java @@ -206,18 +206,18 @@ public class BlockPlacementPolicyWithUpgradeDomain extends * shareRackNotUDSet} >= 3. Removing a node from shareUDNotRackSet * will reduce the # of racks by 1 and won't change # of upgrade * domains. - * Note that this is different from BlockPlacementPolicyDefault which - * will keep the # of racks after deletion. With upgrade domain policy, - * given # of racks is still >= 2 after deletion, the data availability - * model remains the same as BlockPlacementPolicyDefault (only supports - * one rack failure). + * Note that this is similar to BlockPlacementPolicyDefault which + * will at most reduce the # of racks by 1, and never reduce it to < 2. + * With upgrade domain policy, given # of racks is still >= 2 after + * deletion, the data availability model remains the same as + * BlockPlacementPolicyDefault (only supports one rack failure). * For example, assume we have 4 replicas: d1(rack1, ud1), * d2(rack2, ud1), d3(rack3, ud3), d4(rack3, ud4). Thus we have * shareUDNotRackSet: {d1, d2} and shareRackNotUDSet: {d3, d4}. * With upgrade domain policy, the remaining replicas after deletion * are {d1(or d2), d3, d4} which has 2 racks. - * With BlockPlacementPolicyDefault policy, the remaining replicas - * after deletion are {d1, d2, d3(or d4)} which has 3 racks. + * With BlockPlacementPolicyDefault policy, any of the 4 with the worst + * condition will be deleted, which in worst case has 2 racks remain. * 3. shareUDNotRackSet isn't empty and shareRackNotUDSet is empty. This * implies all replicas are on unique racks. Removing a node from * shareUDNotRackSet will reduce # of racks (no different from @@ -244,7 +244,8 @@ public class BlockPlacementPolicyWithUpgradeDomain extends @Override protected Collection pickupReplicaSet( Collection moreThanOne, - Collection exactlyOne) { + Collection exactlyOne, + Map> rackMap) { // shareUDSet includes DatanodeStorageInfo that share same upgrade // domain with another DatanodeStorageInfo. Collection all = combine(moreThanOne, exactlyOne); @@ -255,7 +256,7 @@ public class BlockPlacementPolicyWithUpgradeDomain extends List shareRackAndUDSet = new ArrayList<>(); if (shareUDSet.size() == 0) { // All upgrade domains are unique, use the parent set. - return super.pickupReplicaSet(moreThanOne, exactlyOne); + return super.pickupReplicaSet(moreThanOne, exactlyOne, rackMap); } else if (moreThanOne != null) { for (DatanodeStorageInfo storage : shareUDSet) { if (moreThanOne.contains(storage)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 3723a6c9722..93c1b75413e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -972,22 +972,22 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // test returning null excessTypes.add(StorageType.SSD); assertNull(((BlockPlacementPolicyDefault) replicator) - .chooseReplicaToDelete(first, second, excessTypes)); + .chooseReplicaToDelete(first, second, excessTypes, rackMap)); } excessTypes.add(StorageType.DEFAULT); DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) - .chooseReplicaToDelete(first, second, excessTypes); - // Within first set, storages[1] with less free space - assertEquals(chosen, storages[1]); + .chooseReplicaToDelete(first, second, excessTypes, rackMap); + // Within all storages, storages[5] with least free space + assertEquals(chosen, storages[5]); replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); - assertEquals(0, first.size()); - assertEquals(3, second.size()); - // Within second set, storages[5] with less free space + assertEquals(2, first.size()); + assertEquals(1, second.size()); + // Within first set, storages[1] with less free space excessTypes.add(StorageType.DEFAULT); chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( - first, second, excessTypes); - assertEquals(chosen, storages[5]); + first, second, excessTypes, rackMap); + assertEquals(chosen, storages[1]); } @Test @@ -1032,17 +1032,15 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { excessTypes, storages[3].getDatanodeDescriptor(), null); assertTrue(excessReplicas.contains(excessStorage)); - // The block was initially created on excessSSD(rack r1), // storages[4](rack r3) and storages[5](rack r3) with - // ONESSD_STORAGE_POLICY_NAME storage policy. + // ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 3. // Right after balancer moves the block from storages[5] to // storages[3](rack r2), the application changes the storage policy from // ONESSD_STORAGE_POLICY_NAME to HOT_STORAGE_POLICY_ID. In this case, - // no replica can be chosen as the excessive replica as - // chooseReplicasToDelete only considers storages[4] and storages[5] that - // are the same rack. But neither's storage type is SSD. - // TODO BlockPlacementPolicyDefault should be able to delete excessSSD. + // we should be able to delete excessSSD since the remaining + // storages ({storages[3]}, {storages[4], storages[5]}) + // are on different racks (r2, r3). nonExcess.clear(); nonExcess.add(excessSSD); nonExcess.add(storages[3]); @@ -1053,7 +1051,59 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, excessTypes, storages[3].getDatanodeDescriptor(), storages[5].getDatanodeDescriptor()); - assertTrue(excessReplicas.size() == 0); + assertEquals(1, excessReplicas.size()); + assertTrue(excessReplicas.contains(excessSSD)); + + // Similar to above, but after policy change and before deletion, + // the replicas are located on excessSSD(rack r1), storages[1](rack r1), + // storages[2](rack r2) and storages[3](rack r2). Replication factor = 3. + // In this case, we should be able to delete excessSSD since the remaining + // storages ({storages[1]} , {storages[2], storages[3]}) + // are on different racks (r1, r2). + nonExcess.clear(); + nonExcess.add(excessSSD); + nonExcess.add(storages[1]); + nonExcess.add(storages[2]); + nonExcess.add(storages[3]); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[1].getDatanodeDescriptor(), + storages[3].getDatanodeDescriptor()); + assertEquals(1, excessReplicas.size()); + assertTrue(excessReplicas.contains(excessSSD)); + + // Similar to above, but after policy change and before deletion, + // the replicas are located on excessSSD(rack r1), storages[2](rack r2) + // Replication factor = 1. We should be able to delete excessSSD. + nonExcess.clear(); + nonExcess.add(excessSSD); + nonExcess.add(storages[2]); + excessTypes = storagePolicy.chooseExcess((short) 1, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 1, + excessTypes, storages[2].getDatanodeDescriptor(), null); + assertEquals(1, excessReplicas.size()); + assertTrue(excessReplicas.contains(excessSSD)); + + // The block was initially created on excessSSD(rack r1), + // storages[4](rack r3) and storages[5](rack r3) with + // ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 2. + // In this case, no replica can be chosen as the excessive replica by + // chooseReplicasToDelete because if the SSD storage is deleted, + // the remaining storages[4] and storages[5] are the same rack (r3), + // violating block placement policy (i.e. the number of racks >= 2). + // TODO BlockPlacementPolicyDefault should be able to rebalance the replicas + // and then delete excessSSD. + nonExcess.clear(); + nonExcess.add(excessSSD); + nonExcess.add(storages[4]); + nonExcess.add(storages[5]); + excessTypes = storagePolicy.chooseExcess((short) 2, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 2, + excessTypes, null, null); + assertEquals(0, excessReplicas.size()); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index 367faea0e37..b46983c2b3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -544,7 +544,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes List excessTypes = new ArrayList<>(); excessTypes.add(StorageType.DEFAULT); DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator) - .chooseReplicaToDelete(first, second, excessTypes); + .chooseReplicaToDelete(first, second, excessTypes, rackMap); // Within first set {dataNodes[0], dataNodes[1], dataNodes[2]}, // dataNodes[0] and dataNodes[1] are in the same nodegroup, // but dataNodes[1] is chosen as less free space @@ -557,7 +557,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes // as less free space excessTypes.add(StorageType.DEFAULT); chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( - first, second, excessTypes); + first, second, excessTypes, rackMap); assertEquals(chosen, storages[2]); replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); @@ -566,7 +566,7 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes // Within second set, dataNodes[5] with less free space excessTypes.add(StorageType.DEFAULT); chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete( - first, second, excessTypes); + first, second, excessTypes, rackMap); assertEquals(chosen, storages[5]); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java index 608817f302b..c939220701f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithUpgradeDomain.java @@ -389,6 +389,38 @@ public class TestReplicationPolicyWithUpgradeDomain assertTrue(excessReplicas.size() == 2); assertTrue(excessReplicas.contains(storages[0])); assertTrue(excessReplicas.contains(excessStorage)); + + // Test SSD related deletion. With different rack settings here, but + // similar to {@link TestReplicationPolicy#testChooseReplicasToDelete}. + // The block was initially created on excessSSD(rack r1, UD 4), + // storages[7](rack r3, UD 2) and storages[8](rack r3, UD 3) with + // ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 3. + // Right after balancer moves the block from storages[7] to + // storages[3](rack r2, UD 1), the application changes the storage policy + // from ONESSD_STORAGE_POLICY_NAME to HOT_STORAGE_POLICY_ID. In this case, + // we should be able to delete excessSSD since the remaining + // storages ({storages[3]}, {storages[7], storages[8]}) + // are on different racks (r2, r3) and different UDs (1, 2, 3). + DatanodeStorageInfo excessSSD = DFSTestUtil.createDatanodeStorageInfo( + "Storage-excess-SSD-ID", "localhost", + storages[0].getDatanodeDescriptor().getNetworkLocation(), "foo.com", + StorageType.SSD, null); + DatanodeStorageInfo[] ssds = { excessSSD }; + DatanodeDescriptor ssdNodes[] = DFSTestUtil.toDatanodeDescriptor(ssds); + ssdNodes[0].setUpgradeDomain(Integer.toString(4)); + + nonExcess.clear(); + nonExcess.add(excessSSD); + nonExcess.add(storages[3]); + nonExcess.add(storages[7]); + nonExcess.add(storages[8]); + excessTypes = storagePolicy.chooseExcess((short) 3, + DatanodeStorageInfo.toStorageTypes(nonExcess)); + excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3, + excessTypes, storages[3].getDatanodeDescriptor(), + storages[7].getDatanodeDescriptor()); + assertEquals(1, excessReplicas.size()); + assertTrue(excessReplicas.contains(excessSSD)); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 74bc73d8e6f..4260d94664d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; @@ -633,7 +634,8 @@ public class TestDNFencing { public DatanodeStorageInfo chooseReplicaToDelete( Collection moreThanOne, Collection exactlyOne, - List excessTypes) { + List excessTypes, + Map> rackMap) { Collection chooseFrom = !moreThanOne.isEmpty() ? moreThanOne : exactlyOne;