From ac5e8aed7ca1e9493f96f8795d0caafd5282b9a7 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Wed, 23 Jul 2014 17:25:06 +0000 Subject: [PATCH] HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage types are unavailable. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1612880 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../BlockPlacementPolicyDefault.java | 92 +++++++++++++++---- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f1580740f01..d40cd12ccbe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -17,6 +17,9 @@ HDFS-6584: Archival Storage HDFS-6679. Bump NameNodeLayoutVersion and update editsStored test files. (vinayakumarb via szetszwo) + HDFS-6686. Change BlockPlacementPolicy to use fallback when some storage + types are unavailable. (szetszwo) + Trunk (Unreleased) INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 9932bff2aa0..b049a462bfe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Time.now; import java.util.ArrayList; import java.util.Collection; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -218,7 +219,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean avoidStaleNodes = (stats != null && stats.isAvoidingStaleDataNodesForWrite()); final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy); + blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, + EnumSet.noneOf(StorageType.class), results.isEmpty()); if (!returnChosenNodes) { results.removeAll(chosenStorage); } @@ -238,7 +240,40 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; return new int[] {numOfReplicas, maxNodesPerRack}; } - + + private static List selectStorageTypes( + final BlockStoragePolicy storagePolicy, + final short replication, + final Iterable chosen, + final EnumSet unavailableStorages, + final boolean isNewBlock) { + final List storageTypes = storagePolicy.chooseStorageTypes( + replication, chosen); + final List removed = new ArrayList(); + for(int i = storageTypes.size() - 1; i >= 0; i--) { + // replace/remove unavailable storage types. + final StorageType t = storageTypes.get(i); + if (unavailableStorages.contains(t)) { + final StorageType fallback = isNewBlock? + storagePolicy.getCreationFallback(unavailableStorages) + : storagePolicy.getReplicationFallback(unavailableStorages); + if (fallback == null) { + removed.add(storageTypes.remove(i)); + } else { + storageTypes.set(i, fallback); + } + } + } + if (storageTypes.size() < replication) { + LOG.warn("Failed to place enough replicas: replication is " + replication + + " but only " + storageTypes.size() + " storage types can be selected " + + "(selected=" + storageTypes + + ", unavailable=" + unavailableStorages + + ", removed=" + removed + + ", policy=" + storagePolicy + ")"); + } + return storageTypes; + } /** * choose numOfReplicas from all data nodes * @param numOfReplicas additional number of replicas wanted @@ -257,14 +292,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { final int maxNodesPerRack, final List results, final boolean avoidStaleNodes, - final BlockStoragePolicy storagePolicy) { + final BlockStoragePolicy storagePolicy, + final EnumSet unavailableStorages, + final boolean newBlock) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { return writer; } - int totalReplicasExpected = numOfReplicas + results.size(); - - int numOfResults = results.size(); - boolean newBlock = (numOfResults==0); + final int numOfResults = results.size(); + final int totalReplicasExpected = numOfReplicas + numOfResults; if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) { writer = results.get(0).getDatanodeDescriptor(); } @@ -272,12 +307,25 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // Keep a copy of original excludedNodes final Set oldExcludedNodes = avoidStaleNodes ? new HashSet(excludedNodes) : null; - final List storageTypes = storagePolicy.chooseStorageTypes( - (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results)); + + // choose storage types; use fallbacks for unavailable storages + final List storageTypes = selectStorageTypes(storagePolicy, + (short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results), + unavailableStorages, newBlock); + + StorageType curStorageType = null; try { + if ((numOfReplicas = storageTypes.size()) == 0) { + throw new NotEnoughReplicasException( + "All required storage types are unavailable: " + + " unavailableStorages=" + unavailableStorages + + ", storagePolicy=" + storagePolicy); + } + if (numOfResults == 0) { + curStorageType = storageTypes.remove(0); writer = chooseLocalStorage(writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0), true) + maxNodesPerRack, results, avoidStaleNodes, curStorageType, true) .getDatanodeDescriptor(); if (--numOfReplicas == 0) { return writer; @@ -285,30 +333,33 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); if (numOfResults <= 1) { + curStorageType = storageTypes.remove(0); chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); if (--numOfReplicas == 0) { return writer; } } if (numOfResults <= 2) { final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor(); + curStorageType = storageTypes.remove(0); if (clusterMap.isOnSameRack(dn0, dn1)) { chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); } else if (newBlock){ chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); } else { chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, - results, avoidStaleNodes, storageTypes.remove(0)); + results, avoidStaleNodes, curStorageType); } if (--numOfReplicas == 0) { return writer; } } + curStorageType = storageTypes.remove(0); chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes.remove(0)); + maxNodesPerRack, results, avoidStaleNodes, curStorageType); } catch (NotEnoughReplicasException e) { final String message = "Failed to place enough replicas, still in need of " + (totalReplicasExpected - results.size()) + " to reach " @@ -333,7 +384,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // if the NotEnoughReplicasException was thrown in chooseRandom(). numOfReplicas = totalReplicasExpected - results.size(); return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, - maxNodesPerRack, results, false, storagePolicy); + maxNodesPerRack, results, false, storagePolicy, unavailableStorages, + newBlock); + } + + if (storageTypes.size() > 0) { + // Retry chooseTarget with fallback storage types + unavailableStorages.add(curStorageType); + return chooseTarget(numOfReplicas, writer, excludedNodes, blocksize, + maxNodesPerRack, results, false, storagePolicy, unavailableStorages, + newBlock); } } return writer;