diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 610a5eb103b..3bdff6febee 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -244,3 +244,6 @@ HDFS-8427. Remove dataBlockNum and parityBlockNum from BlockInfoStriped. (Kai Sasaki via jing9) + + HDFS-8186. Erasure coding: Make block placement policy for EC file configurable. + (Walter Su via zhz) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index d5d30955b5b..8e6b9f0b936 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolarent; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.http.HttpConfig; @@ -434,6 +435,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final Class DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class; public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max"; public static final int DFS_REPLICATION_MAX_DEFAULT = 512; + public static final String DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY = "dfs.block.placement.ec.classname"; + public static final Class DFS_BLOCK_PLACEMENT_EC_CLASSNAME_DEFAULT = BlockPlacementPolicyRackFaultTolarent.class; public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; public static final int DFS_DF_INTERVAL_DEFAULT = 60000; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index bc7e4489e0b..d756f2b5c82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -47,8 +47,8 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -199,7 +199,9 @@ public class Balancer { */ private static void checkReplicationPolicyCompatibility(Configuration conf ) throws UnsupportedActionException { - if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof + BlockPlacementPolicies placementPolicies = + new BlockPlacementPolicies(conf, null, null, null); + if (!(placementPolicies.getPolicy(false) instanceof BlockPlacementPolicyDefault)) { throw new UnsupportedActionException( "Balancer without BlockPlacementPolicyDefault"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index fc1396580dc..79cbcc6e8db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -273,7 +273,7 @@ public class BlockManager { private double replicationQueuesInitProgress = 0.0; /** for block replicas placement */ - private BlockPlacementPolicy blockplacement; + private BlockPlacementPolicies placementPolicies; private final BlockStoragePolicySuite storagePolicySuite; /** Check whether name system is running before terminating */ @@ -297,7 +297,7 @@ public class BlockManager { // Compute the map capacity by allocating 2% of total memory blocksMap = new BlocksMap( LightWeightGSet.computeCapacity(2.0, "BlocksMap")); - blockplacement = BlockPlacementPolicy.getInstance( + placementPolicies = new BlockPlacementPolicies( conf, datanodeManager.getFSClusterStats(), datanodeManager.getNetworkTopology(), datanodeManager.getHost2DatanodeMap()); @@ -490,15 +490,7 @@ public class BlockManager { @VisibleForTesting public BlockPlacementPolicy getBlockPlacementPolicy() { - return blockplacement; - } - - /** Set BlockPlacementPolicy */ - public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) { - if (newpolicy == null) { - throw new HadoopIllegalArgumentException("newpolicy == null"); - } - this.blockplacement = newpolicy; + return placementPolicies.getPolicy(false); } /** Dump meta data to out. */ @@ -1504,7 +1496,9 @@ public class BlockManager { // choose replication targets: NOT HOLDING THE GLOBAL LOCK // It is costly to extract the filename for which chooseTargets is called, // so for now we pass in the block collection itself. - rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes); + final BlockPlacementPolicy placementPolicy = + placementPolicies.getPolicy(rw.block.isStriped()); + rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes); } // Step 3: add tasks to the DN @@ -1630,7 +1624,7 @@ public class BlockManager { /** Choose target for WebHDFS redirection. */ public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src, DatanodeDescriptor clientnode, Set excludes, long blocksize) { - return blockplacement.chooseTarget(src, 1, clientnode, + return placementPolicies.getPolicy(false).chooseTarget(src, 1, clientnode, Collections.emptyList(), false, excludes, blocksize, storagePolicySuite.getDefaultPolicy()); } @@ -1642,9 +1636,10 @@ public class BlockManager { List chosen, Set excludes, long blocksize, - byte storagePolicyID) { - + byte storagePolicyID, + boolean isStriped) { final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); + final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped); return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode, chosen, true, excludes, blocksize, storagePolicy); } @@ -1662,10 +1657,12 @@ public class BlockManager { final Set excludedNodes, final long blocksize, final List favoredNodes, - final byte storagePolicyID) throws IOException { + final byte storagePolicyID, + final boolean isStriped) throws IOException { List favoredDatanodeDescriptors = getDatanodeDescriptors(favoredNodes); final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); + final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped); final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src, numOfReplicas, client, excludedNodes, blocksize, favoredDatanodeDescriptors, storagePolicy); @@ -3088,7 +3085,7 @@ public class BlockManager { } } chooseExcessReplicates(nonExcess, block, replication, - addedNode, delNodeHint, blockplacement); + addedNode, delNodeHint, placementPolicies.getPolicy(false)); } @@ -4126,4 +4123,5 @@ public class BlockManager { clearQueues(); blocksMap.clear(); } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java new file mode 100644 index 00000000000..622b2581fdf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.util.ReflectionUtils; + +public class BlockPlacementPolicies{ + + private final BlockPlacementPolicy replicationPolicy; + private final BlockPlacementPolicy ecPolicy; + + public BlockPlacementPolicies(Configuration conf, FSClusterStats stats, + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap){ + final Class replicatorClass = conf + .getClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, + BlockPlacementPolicy.class); + replicationPolicy = ReflectionUtils.newInstance(replicatorClass, conf); + replicationPolicy.initialize(conf, stats, clusterMap, host2datanodeMap); + final Class blockPlacementECClass = + conf.getClass(DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, + DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_DEFAULT, + BlockPlacementPolicy.class); + ecPolicy = ReflectionUtils.newInstance(blockPlacementECClass, conf); + ecPolicy.initialize(conf, stats, clusterMap, host2datanodeMap); + } + + public BlockPlacementPolicy getPolicy(boolean isStriped){ + if (isStriped) { + return ecPolicy; + } else { + return replicationPolicy; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 9696179185e..86aaf79cedc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -145,31 +145,7 @@ public abstract class BlockPlacementPolicy { abstract protected void initialize(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap, Host2NodesMap host2datanodeMap); - - /** - * Get an instance of the configured Block Placement Policy based on the - * the configuration property - * {@link DFSConfigKeys#DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}. - * - * @param conf the configuration to be used - * @param stats an object that is used to retrieve the load on the cluster - * @param clusterMap the network topology of the cluster - * @return an instance of BlockPlacementPolicy - */ - public static BlockPlacementPolicy getInstance(Configuration conf, - FSClusterStats stats, - NetworkTopology clusterMap, - Host2NodesMap host2datanodeMap) { - final Class replicatorClass = conf.getClass( - DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, - DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, - BlockPlacementPolicy.class); - final BlockPlacementPolicy replicator = ReflectionUtils.newInstance( - replicatorClass, conf); - replicator.initialize(conf, stats, clusterMap, host2datanodeMap); - return replicator; - } - + /** * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 6137fd51a5c..11b6d8f2f4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -174,6 +174,7 @@ class FSDirWriteFileOp { final short numTargets; final byte storagePolicyID; String clientMachine; + final boolean isStriped; byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); src = fsn.dir.resolvePath(pc, src, pathComponents); @@ -199,13 +200,13 @@ class FSDirWriteFileOp { blockSize = pendingFile.getPreferredBlockSize(); clientMachine = pendingFile.getFileUnderConstructionFeature() .getClientMachine(); - boolean isStriped = pendingFile.isStriped(); + isStriped = pendingFile.isStriped(); numTargets = isStriped ? HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : pendingFile.getFileReplication(); storagePolicyID = pendingFile.getStoragePolicyID(); return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID, - clientMachine); + clientMachine, isStriped); } static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk, @@ -289,7 +290,8 @@ class FSDirWriteFileOp { // choose targets for the new block to be allocated. return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode, excludedNodesSet, r.blockSize, - favoredNodesList, r.storagePolicyID); + favoredNodesList, r.storagePolicyID, + r.isStriped); } /** @@ -867,14 +869,16 @@ class FSDirWriteFileOp { final int numTargets; final byte storagePolicyID; final String clientMachine; + final boolean isStriped; ValidateAddBlockResult( long blockSize, int numTargets, byte storagePolicyID, - String clientMachine) { + String clientMachine, boolean isStriped) { this.blockSize = blockSize; this.numTargets = numTargets; this.storagePolicyID = storagePolicyID; this.clientMachine = clientMachine; + this.isStriped = isStriped; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 7e45e9076c7..e9bb2f79f03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2935,6 +2935,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, final long preferredblocksize; final byte storagePolicyID; final List chosen; + final boolean isStriped; checkOperation(OperationCategory.READ); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); FSPermissionChecker pc = getPermissionChecker(); @@ -2961,6 +2962,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); preferredblocksize = file.getPreferredBlockSize(); storagePolicyID = file.getStoragePolicyID(); + isStriped = file.isStriped(); //find datanode storages final DatanodeManager dm = blockManager.getDatanodeManager(); @@ -2976,7 +2978,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // choose new datanodes. final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode( src, numAdditionalNodes, clientnode, chosen, - excludes, preferredblocksize, storagePolicyID); + excludes, preferredblocksize, storagePolicyID, isStriped); final LocatedBlock lb = BlockManager.newLocatedBlock( blk, targets, -1, false); blockManager.setBlockToken(lb, BlockTokenIdentifier.AccessMode.COPY); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index fccef1795cf..10673943dd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -65,9 +65,8 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; @@ -171,7 +170,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { private final PrintWriter out; private List snapshottableDirs = null; - private final BlockPlacementPolicy bpPolicy; + private final BlockPlacementPolicies bpPolicies; private StoragePolicySummary storageTypeSummary = null; /** @@ -193,7 +192,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { this.out = out; this.totalDatanodes = totalDatanodes; this.remoteAddress = remoteAddress; - this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null, + this.bpPolicies = new BlockPlacementPolicies(conf, null, networktopology, namenode.getNamesystem().getBlockManager().getDatanodeManager() .getHost2DatanodeMap()); @@ -601,7 +600,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { } // count mis replicated blocks - BlockPlacementStatus blockPlacementStatus = bpPolicy + BlockPlacementStatus blockPlacementStatus = bpPolicies.getPolicy(false) .verifyBlockPlacement(path, lBlk, targetFileReplication); if (!blockPlacementStatus.isPlacementPolicySatisfied()) { res.numMisReplicatedBlocks++;