HDFS-8186. Erasure coding: Make block placement policy for EC file configurable. Contributed by Walter Su.

This commit is contained in:
Zhe Zhang 2015-05-20 15:37:50 -07:00
parent 744ef17792
commit e53fa769c9
9 changed files with 95 additions and 54 deletions

View File

@ -244,3 +244,6 @@
HDFS-8427. Remove dataBlockNum and parityBlockNum from BlockInfoStriped. HDFS-8427. Remove dataBlockNum and parityBlockNum from BlockInfoStriped.
(Kai Sasaki via jing9) (Kai Sasaki via jing9)
HDFS-8186. Erasure coding: Make block placement policy for EC file configurable.
(Walter Su via zhz)

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolarent;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
@ -434,6 +435,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class; public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class;
public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max"; public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max";
public static final int DFS_REPLICATION_MAX_DEFAULT = 512; public static final int DFS_REPLICATION_MAX_DEFAULT = 512;
public static final String DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY = "dfs.block.placement.ec.classname";
public static final Class<BlockPlacementPolicyRackFaultTolarent> DFS_BLOCK_PLACEMENT_EC_CLASSNAME_DEFAULT = BlockPlacementPolicyRackFaultTolarent.class;
public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval"; public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval";
public static final int DFS_DF_INTERVAL_DEFAULT = 60000; public static final int DFS_DF_INTERVAL_DEFAULT = 60000;

View File

@ -47,8 +47,8 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -199,7 +199,9 @@ public class Balancer {
*/ */
private static void checkReplicationPolicyCompatibility(Configuration conf private static void checkReplicationPolicyCompatibility(Configuration conf
) throws UnsupportedActionException { ) throws UnsupportedActionException {
if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof BlockPlacementPolicies placementPolicies =
new BlockPlacementPolicies(conf, null, null, null);
if (!(placementPolicies.getPolicy(false) instanceof
BlockPlacementPolicyDefault)) { BlockPlacementPolicyDefault)) {
throw new UnsupportedActionException( throw new UnsupportedActionException(
"Balancer without BlockPlacementPolicyDefault"); "Balancer without BlockPlacementPolicyDefault");

View File

@ -273,7 +273,7 @@ public class BlockManager {
private double replicationQueuesInitProgress = 0.0; private double replicationQueuesInitProgress = 0.0;
/** for block replicas placement */ /** for block replicas placement */
private BlockPlacementPolicy blockplacement; private BlockPlacementPolicies placementPolicies;
private final BlockStoragePolicySuite storagePolicySuite; private final BlockStoragePolicySuite storagePolicySuite;
/** Check whether name system is running before terminating */ /** Check whether name system is running before terminating */
@ -297,7 +297,7 @@ public class BlockManager {
// Compute the map capacity by allocating 2% of total memory // Compute the map capacity by allocating 2% of total memory
blocksMap = new BlocksMap( blocksMap = new BlocksMap(
LightWeightGSet.computeCapacity(2.0, "BlocksMap")); LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
blockplacement = BlockPlacementPolicy.getInstance( placementPolicies = new BlockPlacementPolicies(
conf, datanodeManager.getFSClusterStats(), conf, datanodeManager.getFSClusterStats(),
datanodeManager.getNetworkTopology(), datanodeManager.getNetworkTopology(),
datanodeManager.getHost2DatanodeMap()); datanodeManager.getHost2DatanodeMap());
@ -490,15 +490,7 @@ public class BlockManager {
@VisibleForTesting @VisibleForTesting
public BlockPlacementPolicy getBlockPlacementPolicy() { public BlockPlacementPolicy getBlockPlacementPolicy() {
return blockplacement; return placementPolicies.getPolicy(false);
}
/** Set BlockPlacementPolicy */
public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) {
if (newpolicy == null) {
throw new HadoopIllegalArgumentException("newpolicy == null");
}
this.blockplacement = newpolicy;
} }
/** Dump meta data to out. */ /** Dump meta data to out. */
@ -1504,7 +1496,9 @@ public class BlockManager {
// choose replication targets: NOT HOLDING THE GLOBAL LOCK // choose replication targets: NOT HOLDING THE GLOBAL LOCK
// It is costly to extract the filename for which chooseTargets is called, // It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the block collection itself. // so for now we pass in the block collection itself.
rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes); final BlockPlacementPolicy placementPolicy =
placementPolicies.getPolicy(rw.block.isStriped());
rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
} }
// Step 3: add tasks to the DN // Step 3: add tasks to the DN
@ -1630,7 +1624,7 @@ public class BlockManager {
/** Choose target for WebHDFS redirection. */ /** Choose target for WebHDFS redirection. */
public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src, public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) { DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
return blockplacement.chooseTarget(src, 1, clientnode, return placementPolicies.getPolicy(false).chooseTarget(src, 1, clientnode,
Collections.<DatanodeStorageInfo>emptyList(), false, excludes, Collections.<DatanodeStorageInfo>emptyList(), false, excludes,
blocksize, storagePolicySuite.getDefaultPolicy()); blocksize, storagePolicySuite.getDefaultPolicy());
} }
@ -1642,9 +1636,10 @@ public class BlockManager {
List<DatanodeStorageInfo> chosen, List<DatanodeStorageInfo> chosen,
Set<Node> excludes, Set<Node> excludes,
long blocksize, long blocksize,
byte storagePolicyID) { byte storagePolicyID,
boolean isStriped) {
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped);
return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode, return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode,
chosen, true, excludes, blocksize, storagePolicy); chosen, true, excludes, blocksize, storagePolicy);
} }
@ -1662,10 +1657,12 @@ public class BlockManager {
final Set<Node> excludedNodes, final Set<Node> excludedNodes,
final long blocksize, final long blocksize,
final List<String> favoredNodes, final List<String> favoredNodes,
final byte storagePolicyID) throws IOException { final byte storagePolicyID,
final boolean isStriped) throws IOException {
List<DatanodeDescriptor> favoredDatanodeDescriptors = List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes); getDatanodeDescriptors(favoredNodes);
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped);
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src, final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
numOfReplicas, client, excludedNodes, blocksize, numOfReplicas, client, excludedNodes, blocksize,
favoredDatanodeDescriptors, storagePolicy); favoredDatanodeDescriptors, storagePolicy);
@ -3088,7 +3085,7 @@ public class BlockManager {
} }
} }
chooseExcessReplicates(nonExcess, block, replication, chooseExcessReplicates(nonExcess, block, replication,
addedNode, delNodeHint, blockplacement); addedNode, delNodeHint, placementPolicies.getPolicy(false));
} }
@ -4126,4 +4123,5 @@ public class BlockManager {
clearQueues(); clearQueues();
blocksMap.clear(); blocksMap.clear();
} }
} }

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.ReflectionUtils;
public class BlockPlacementPolicies{
private final BlockPlacementPolicy replicationPolicy;
private final BlockPlacementPolicy ecPolicy;
public BlockPlacementPolicies(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap){
final Class<? extends BlockPlacementPolicy> replicatorClass = conf
.getClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
BlockPlacementPolicy.class);
replicationPolicy = ReflectionUtils.newInstance(replicatorClass, conf);
replicationPolicy.initialize(conf, stats, clusterMap, host2datanodeMap);
final Class<? extends BlockPlacementPolicy> blockPlacementECClass =
conf.getClass(DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_DEFAULT,
BlockPlacementPolicy.class);
ecPolicy = ReflectionUtils.newInstance(blockPlacementECClass, conf);
ecPolicy.initialize(conf, stats, clusterMap, host2datanodeMap);
}
public BlockPlacementPolicy getPolicy(boolean isStriped){
if (isStriped) {
return ecPolicy;
} else {
return replicationPolicy;
}
}
}

View File

@ -145,31 +145,7 @@ public abstract class BlockPlacementPolicy {
abstract protected void initialize(Configuration conf, FSClusterStats stats, abstract protected void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap, NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap); Host2NodesMap host2datanodeMap);
/**
* Get an instance of the configured Block Placement Policy based on the
* the configuration property
* {@link DFSConfigKeys#DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
*
* @param conf the configuration to be used
* @param stats an object that is used to retrieve the load on the cluster
* @param clusterMap the network topology of the cluster
* @return an instance of BlockPlacementPolicy
*/
public static BlockPlacementPolicy getInstance(Configuration conf,
FSClusterStats stats,
NetworkTopology clusterMap,
Host2NodesMap host2datanodeMap) {
final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
BlockPlacementPolicy.class);
final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
replicatorClass, conf);
replicator.initialize(conf, stats, clusterMap, host2datanodeMap);
return replicator;
}
/** /**
* Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur. * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
* *

View File

@ -174,6 +174,7 @@ class FSDirWriteFileOp {
final short numTargets; final short numTargets;
final byte storagePolicyID; final byte storagePolicyID;
String clientMachine; String clientMachine;
final boolean isStriped;
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsn.dir.resolvePath(pc, src, pathComponents); src = fsn.dir.resolvePath(pc, src, pathComponents);
@ -199,13 +200,13 @@ class FSDirWriteFileOp {
blockSize = pendingFile.getPreferredBlockSize(); blockSize = pendingFile.getPreferredBlockSize();
clientMachine = pendingFile.getFileUnderConstructionFeature() clientMachine = pendingFile.getFileUnderConstructionFeature()
.getClientMachine(); .getClientMachine();
boolean isStriped = pendingFile.isStriped(); isStriped = pendingFile.isStriped();
numTargets = isStriped ? numTargets = isStriped ?
HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS :
pendingFile.getFileReplication(); pendingFile.getFileReplication();
storagePolicyID = pendingFile.getStoragePolicyID(); storagePolicyID = pendingFile.getStoragePolicyID();
return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID, return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
clientMachine); clientMachine, isStriped);
} }
static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk, static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk,
@ -289,7 +290,8 @@ class FSDirWriteFileOp {
// choose targets for the new block to be allocated. // choose targets for the new block to be allocated.
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode, return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
excludedNodesSet, r.blockSize, excludedNodesSet, r.blockSize,
favoredNodesList, r.storagePolicyID); favoredNodesList, r.storagePolicyID,
r.isStriped);
} }
/** /**
@ -867,14 +869,16 @@ class FSDirWriteFileOp {
final int numTargets; final int numTargets;
final byte storagePolicyID; final byte storagePolicyID;
final String clientMachine; final String clientMachine;
final boolean isStriped;
ValidateAddBlockResult( ValidateAddBlockResult(
long blockSize, int numTargets, byte storagePolicyID, long blockSize, int numTargets, byte storagePolicyID,
String clientMachine) { String clientMachine, boolean isStriped) {
this.blockSize = blockSize; this.blockSize = blockSize;
this.numTargets = numTargets; this.numTargets = numTargets;
this.storagePolicyID = storagePolicyID; this.storagePolicyID = storagePolicyID;
this.clientMachine = clientMachine; this.clientMachine = clientMachine;
this.isStriped = isStriped;
} }
} }

View File

@ -2935,6 +2935,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final long preferredblocksize; final long preferredblocksize;
final byte storagePolicyID; final byte storagePolicyID;
final List<DatanodeStorageInfo> chosen; final List<DatanodeStorageInfo> chosen;
final boolean isStriped;
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker pc = getPermissionChecker();
@ -2961,6 +2962,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
preferredblocksize = file.getPreferredBlockSize(); preferredblocksize = file.getPreferredBlockSize();
storagePolicyID = file.getStoragePolicyID(); storagePolicyID = file.getStoragePolicyID();
isStriped = file.isStriped();
//find datanode storages //find datanode storages
final DatanodeManager dm = blockManager.getDatanodeManager(); final DatanodeManager dm = blockManager.getDatanodeManager();
@ -2976,7 +2978,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// choose new datanodes. // choose new datanodes.
final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode( final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode(
src, numAdditionalNodes, clientnode, chosen, src, numAdditionalNodes, clientnode, chosen,
excludes, preferredblocksize, storagePolicyID); excludes, preferredblocksize, storagePolicyID, isStriped);
final LocatedBlock lb = BlockManager.newLocatedBlock( final LocatedBlock lb = BlockManager.newLocatedBlock(
blk, targets, -1, false); blk, targets, -1, false);
blockManager.setBlockToken(lb, BlockTokenIdentifier.AccessMode.COPY); blockManager.setBlockToken(lb, BlockTokenIdentifier.AccessMode.COPY);

View File

@ -65,9 +65,8 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
@ -171,7 +170,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
private final PrintWriter out; private final PrintWriter out;
private List<String> snapshottableDirs = null; private List<String> snapshottableDirs = null;
private final BlockPlacementPolicy bpPolicy; private final BlockPlacementPolicies bpPolicies;
private StoragePolicySummary storageTypeSummary = null; private StoragePolicySummary storageTypeSummary = null;
/** /**
@ -193,7 +192,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
this.out = out; this.out = out;
this.totalDatanodes = totalDatanodes; this.totalDatanodes = totalDatanodes;
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null, this.bpPolicies = new BlockPlacementPolicies(conf, null,
networktopology, networktopology,
namenode.getNamesystem().getBlockManager().getDatanodeManager() namenode.getNamesystem().getBlockManager().getDatanodeManager()
.getHost2DatanodeMap()); .getHost2DatanodeMap());
@ -601,7 +600,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
} }
// count mis replicated blocks // count mis replicated blocks
BlockPlacementStatus blockPlacementStatus = bpPolicy BlockPlacementStatus blockPlacementStatus = bpPolicies.getPolicy(false)
.verifyBlockPlacement(path, lBlk, targetFileReplication); .verifyBlockPlacement(path, lBlk, targetFileReplication);
if (!blockPlacementStatus.isPlacementPolicySatisfied()) { if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
res.numMisReplicatedBlocks++; res.numMisReplicatedBlocks++;