HDFS-8647. Abstract BlockManager's rack policy into BlockPlacementPolicy. (Brahma Reddy Battula via mingma)
This commit is contained in:
parent
b37c41fd6e
commit
e27c2ae8ba
|
@ -379,6 +379,13 @@ public class NetworkTopology {
|
||||||
private int depthOfAllLeaves = -1;
|
private int depthOfAllLeaves = -1;
|
||||||
/** rack counter */
|
/** rack counter */
|
||||||
protected int numOfRacks = 0;
|
protected int numOfRacks = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether or not this cluster has ever consisted of more than 1 rack,
|
||||||
|
* according to the NetworkTopology.
|
||||||
|
*/
|
||||||
|
private boolean clusterEverBeenMultiRack = false;
|
||||||
|
|
||||||
/** the lock used to manage access */
|
/** the lock used to manage access */
|
||||||
protected ReadWriteLock netlock = new ReentrantReadWriteLock();
|
protected ReadWriteLock netlock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
|
@ -417,7 +424,7 @@ public class NetworkTopology {
|
||||||
if (clusterMap.add(node)) {
|
if (clusterMap.add(node)) {
|
||||||
LOG.info("Adding a new node: "+NodeBase.getPath(node));
|
LOG.info("Adding a new node: "+NodeBase.getPath(node));
|
||||||
if (rack == null) {
|
if (rack == null) {
|
||||||
numOfRacks++;
|
incrementRacks();
|
||||||
}
|
}
|
||||||
if (!(node instanceof InnerNode)) {
|
if (!(node instanceof InnerNode)) {
|
||||||
if (depthOfAllLeaves == -1) {
|
if (depthOfAllLeaves == -1) {
|
||||||
|
@ -433,6 +440,13 @@ public class NetworkTopology {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void incrementRacks() {
|
||||||
|
numOfRacks++;
|
||||||
|
if (!clusterEverBeenMultiRack && numOfRacks > 1) {
|
||||||
|
clusterEverBeenMultiRack = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a reference to the node given its string representation.
|
* Return a reference to the node given its string representation.
|
||||||
* Default implementation delegates to {@link #getNode(String)}.
|
* Default implementation delegates to {@link #getNode(String)}.
|
||||||
|
@ -541,6 +555,14 @@ public class NetworkTopology {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if this cluster has ever consisted of multiple racks, even if
|
||||||
|
* it is not now a multi-rack cluster.
|
||||||
|
*/
|
||||||
|
public boolean hasClusterEverBeenMultiRack() {
|
||||||
|
return clusterEverBeenMultiRack;
|
||||||
|
}
|
||||||
|
|
||||||
/** Given a string representation of a rack for a specific network
|
/** Given a string representation of a rack for a specific network
|
||||||
* location
|
* location
|
||||||
*
|
*
|
||||||
|
|
|
@ -205,7 +205,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
|
||||||
LOG.info("Adding a new node: " + NodeBase.getPath(node));
|
LOG.info("Adding a new node: " + NodeBase.getPath(node));
|
||||||
if (rack == null) {
|
if (rack == null) {
|
||||||
// We only track rack number here
|
// We only track rack number here
|
||||||
numOfRacks++;
|
incrementRacks();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
|
|
|
@ -1543,6 +1543,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9251. Refactor TestWriteToReplica and TestFsDatasetImpl to avoid
|
HDFS-9251. Refactor TestWriteToReplica and TestFsDatasetImpl to avoid
|
||||||
explicitly creating Files in the tests code. (lei)
|
explicitly creating Files in the tests code. (lei)
|
||||||
|
|
||||||
|
HDFS-8647. Abstract BlockManager's rack policy into BlockPlacementPolicy.
|
||||||
|
(Brahma Reddy Battula via mingma)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -252,9 +252,6 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
final float blocksInvalidateWorkPct;
|
final float blocksInvalidateWorkPct;
|
||||||
final int blocksReplWorkMultiplier;
|
final int blocksReplWorkMultiplier;
|
||||||
|
|
||||||
/** variable to enable check for enough racks */
|
|
||||||
final boolean shouldCheckForEnoughRacks;
|
|
||||||
|
|
||||||
// whether or not to issue block encryption keys.
|
// whether or not to issue block encryption keys.
|
||||||
final boolean encryptDataTransfer;
|
final boolean encryptDataTransfer;
|
||||||
|
|
||||||
|
@ -355,10 +352,6 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
conf.getInt(
|
conf.getInt(
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
|
||||||
this.shouldCheckForEnoughRacks =
|
|
||||||
conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
|
|
||||||
? false : true;
|
|
||||||
|
|
||||||
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
|
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
|
||||||
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
|
||||||
|
|
||||||
|
@ -382,7 +375,6 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
LOG.info("maxReplication = " + maxReplication);
|
LOG.info("maxReplication = " + maxReplication);
|
||||||
LOG.info("minReplication = " + minReplication);
|
LOG.info("minReplication = " + minReplication);
|
||||||
LOG.info("maxReplicationStreams = " + maxReplicationStreams);
|
LOG.info("maxReplicationStreams = " + maxReplicationStreams);
|
||||||
LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
|
|
||||||
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
|
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
|
||||||
LOG.info("encryptDataTransfer = " + encryptDataTransfer);
|
LOG.info("encryptDataTransfer = " + encryptDataTransfer);
|
||||||
LOG.info("maxNumBlocksToLog = " + maxNumBlocksToLog);
|
LOG.info("maxNumBlocksToLog = " + maxNumBlocksToLog);
|
||||||
|
@ -1531,7 +1523,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
NumberReplicas numReplicas, int pendingReplicaNum, int required) {
|
NumberReplicas numReplicas, int pendingReplicaNum, int required) {
|
||||||
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
|
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
|
||||||
return (numEffectiveReplicas >= required) &&
|
return (numEffectiveReplicas >= required) &&
|
||||||
(pendingReplicaNum > 0 || blockHasEnoughRacks(block, required));
|
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockRecoveryWork scheduleRecovery(BlockInfo block, int priority) {
|
private BlockRecoveryWork scheduleRecovery(BlockInfo block, int priority) {
|
||||||
|
@ -1627,7 +1619,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
|
|
||||||
DatanodeStorageInfo[] targets = rw.getTargets();
|
DatanodeStorageInfo[] targets = rw.getTargets();
|
||||||
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
||||||
(!blockHasEnoughRacks(block, requiredReplication)) ) {
|
(!isPlacementPolicySatisfied(block)) ) {
|
||||||
if (rw.getSrcNodes()[0].getNetworkLocation().equals(
|
if (rw.getSrcNodes()[0].getNetworkLocation().equals(
|
||||||
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
|
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
|
||||||
//No use continuing, unless a new rack in this case
|
//No use continuing, unless a new rack in this case
|
||||||
|
@ -3145,8 +3137,8 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
bc.getStoragePolicyID());
|
bc.getStoragePolicyID());
|
||||||
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
||||||
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||||
chooseExcessReplicasContiguous(bc, nonExcess, storedBlock,
|
chooseExcessReplicasContiguous(nonExcess, storedBlock, replication,
|
||||||
replication, addedNode, delNodeHint, excessTypes);
|
addedNode, delNodeHint, excessTypes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3164,45 +3156,16 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
* If no such a node is available,
|
* If no such a node is available,
|
||||||
* then pick a node with least free space
|
* then pick a node with least free space
|
||||||
*/
|
*/
|
||||||
private void chooseExcessReplicasContiguous(BlockCollection bc,
|
private void chooseExcessReplicasContiguous(
|
||||||
final Collection<DatanodeStorageInfo> nonExcess,
|
final Collection<DatanodeStorageInfo> nonExcess, BlockInfo storedBlock,
|
||||||
BlockInfo storedBlock, short replication,
|
short replication, DatanodeDescriptor addedNode,
|
||||||
DatanodeDescriptor addedNode,
|
DatanodeDescriptor delNodeHint, List<StorageType> excessTypes) {
|
||||||
DatanodeDescriptor delNodeHint,
|
|
||||||
List<StorageType> excessTypes) {
|
|
||||||
BlockPlacementPolicy replicator = placementPolicies.getPolicy(false);
|
BlockPlacementPolicy replicator = placementPolicies.getPolicy(false);
|
||||||
final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
|
List<DatanodeStorageInfo> replicasToDelete = replicator
|
||||||
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
|
.chooseReplicasToDelete(nonExcess, replication, excessTypes,
|
||||||
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
|
addedNode, delNodeHint);
|
||||||
|
for (DatanodeStorageInfo choosenReplica : replicasToDelete) {
|
||||||
// split nodes into two sets
|
processChosenExcessReplica(nonExcess, choosenReplica, storedBlock);
|
||||||
// moreThanOne contains nodes on rack with more than one replica
|
|
||||||
// exactlyOne contains the remaining nodes
|
|
||||||
replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
|
|
||||||
|
|
||||||
// pick one node to delete that favors the delete hint
|
|
||||||
// otherwise pick one with least space from priSet if it is not empty
|
|
||||||
// otherwise one node with least space from remains
|
|
||||||
boolean firstOne = true;
|
|
||||||
final DatanodeStorageInfo delNodeHintStorage
|
|
||||||
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
|
|
||||||
final DatanodeStorageInfo addedNodeStorage
|
|
||||||
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
|
|
||||||
while (nonExcess.size() - replication > 0) {
|
|
||||||
final DatanodeStorageInfo cur;
|
|
||||||
if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
|
|
||||||
moreThanOne, excessTypes)) {
|
|
||||||
cur = delNodeHintStorage;
|
|
||||||
} else { // regular excessive replica removal
|
|
||||||
cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication,
|
|
||||||
moreThanOne, exactlyOne, excessTypes);
|
|
||||||
}
|
|
||||||
firstOne = false;
|
|
||||||
// adjust rackmap, moreThanOne, and exactlyOne
|
|
||||||
replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
|
|
||||||
exactlyOne, cur);
|
|
||||||
|
|
||||||
processChosenExcessReplica(nonExcess, cur, storedBlock);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3223,7 +3186,6 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
BlockInfoStriped sblk = (BlockInfoStriped) storedBlock;
|
BlockInfoStriped sblk = (BlockInfoStriped) storedBlock;
|
||||||
short groupSize = sblk.getTotalBlockNum();
|
short groupSize = sblk.getTotalBlockNum();
|
||||||
BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(true);
|
BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(true);
|
||||||
List<DatanodeStorageInfo> empty = new ArrayList<>(0);
|
|
||||||
|
|
||||||
// find all duplicated indices
|
// find all duplicated indices
|
||||||
BitSet found = new BitSet(groupSize); //indices found
|
BitSet found = new BitSet(groupSize); //indices found
|
||||||
|
@ -3270,10 +3232,13 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
Block internalBlock = new Block(storedBlock);
|
Block internalBlock = new Block(storedBlock);
|
||||||
internalBlock.setBlockId(storedBlock.getBlockId() + targetIndex);
|
internalBlock.setBlockId(storedBlock.getBlockId() + targetIndex);
|
||||||
while (candidates.size() > 1) {
|
while (candidates.size() > 1) {
|
||||||
DatanodeStorageInfo target = placementPolicy.chooseReplicaToDelete(bc,
|
List<DatanodeStorageInfo> replicasToDelete = placementPolicy
|
||||||
internalBlock, (short)1, candidates, empty, excessTypes);
|
.chooseReplicasToDelete(candidates, (short) 1, excessTypes, null,
|
||||||
processChosenExcessReplica(nonExcess, target, storedBlock);
|
null);
|
||||||
candidates.remove(target);
|
for (DatanodeStorageInfo chosen : replicasToDelete) {
|
||||||
|
processChosenExcessReplica(nonExcess, chosen, storedBlock);
|
||||||
|
candidates.remove(chosen);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
duplicated.clear(targetIndex);
|
duplicated.clear(targetIndex);
|
||||||
}
|
}
|
||||||
|
@ -3299,27 +3264,6 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
+ "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
|
+ "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Check if we can use delHint */
|
|
||||||
static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
|
|
||||||
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
|
|
||||||
List<StorageType> excessTypes) {
|
|
||||||
if (!isFirst) {
|
|
||||||
return false; // only consider delHint for the first case
|
|
||||||
} else if (delHint == null) {
|
|
||||||
return false; // no delHint
|
|
||||||
} else if (!excessTypes.contains(delHint.getStorageType())) {
|
|
||||||
return false; // delHint storage type is not an excess type
|
|
||||||
} else {
|
|
||||||
// check if removing delHint reduces the number of racks
|
|
||||||
if (moreThan1Racks.contains(delHint)) {
|
|
||||||
return true; // delHint and some other nodes are under the same rack
|
|
||||||
} else if (added != null && !moreThan1Racks.contains(added)) {
|
|
||||||
return true; // the added node adds a new rack
|
|
||||||
}
|
|
||||||
return false; // removing delHint reduces the number of racks;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
|
private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
|
||||||
assert namesystem.hasWriteLock();
|
assert namesystem.hasWriteLock();
|
||||||
LightWeightHashSet<BlockInfo> excessBlocks = excessReplicateMap.get(
|
LightWeightHashSet<BlockInfo> excessBlocks = excessReplicateMap.get(
|
||||||
|
@ -3888,74 +3832,23 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
return invalidateBlocks.contains(dn, block);
|
return invalidateBlocks.contains(dn, block);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) {
|
boolean isPlacementPolicySatisfied(BlockInfo storedBlock) {
|
||||||
if (!this.shouldCheckForEnoughRacks) {
|
List<DatanodeDescriptor> liveNodes = new ArrayList<>();
|
||||||
return true;
|
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
|
||||||
}
|
.getNodes(storedBlock);
|
||||||
Collection<DatanodeDescriptor> corruptNodes =
|
|
||||||
corruptReplicas.getNodes(storedBlock);
|
|
||||||
|
|
||||||
if (storedBlock.isStriped()) {
|
|
||||||
return blockHasEnoughRacksStriped(storedBlock, corruptNodes);
|
|
||||||
} else {
|
|
||||||
return blockHashEnoughRacksContiguous(storedBlock, expectedStorageNum,
|
|
||||||
corruptNodes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verify whether given striped block is distributed through enough racks.
|
|
||||||
* As dicussed in HDFS-7613, ec file requires racks at least as many as
|
|
||||||
* the number of data block number.
|
|
||||||
*/
|
|
||||||
boolean blockHasEnoughRacksStriped(BlockInfo storedBlock,
|
|
||||||
Collection<DatanodeDescriptor> corruptNodes) {
|
|
||||||
if (!datanodeManager.hasClusterEverBeenMultiRack()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
boolean enoughRacks = false;
|
|
||||||
Set<String> rackNameSet = new HashSet<>();
|
|
||||||
int dataBlockNum = ((BlockInfoStriped)storedBlock).getRealDataBlockNum();
|
|
||||||
for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
|
for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
|
||||||
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
|
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
|
||||||
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()
|
||||||
if ((corruptNodes == null) || !corruptNodes.contains(cur)) {
|
&& ((corruptNodes == null) || !corruptNodes.contains(cur))) {
|
||||||
String rackNameNew = cur.getNetworkLocation();
|
liveNodes.add(cur);
|
||||||
rackNameSet.add(rackNameNew);
|
|
||||||
if (rackNameSet.size() >= dataBlockNum) {
|
|
||||||
enoughRacks = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return enoughRacks;
|
DatanodeInfo[] locs = liveNodes.toArray(new DatanodeInfo[liveNodes.size()]);
|
||||||
}
|
BlockPlacementPolicy placementPolicy = placementPolicies
|
||||||
|
.getPolicy(storedBlock.isStriped());
|
||||||
boolean blockHashEnoughRacksContiguous(BlockInfo storedBlock,
|
int numReplicas = storedBlock.isStriped() ? ((BlockInfoStriped) storedBlock)
|
||||||
int expectedStorageNum, Collection<DatanodeDescriptor> corruptNodes) {
|
.getRealDataBlockNum() : storedBlock.getReplication();
|
||||||
boolean enoughRacks = false;
|
return placementPolicy.verifyBlockPlacement(locs, numReplicas).isPlacementPolicySatisfied();
|
||||||
String rackName = null;
|
|
||||||
for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
|
|
||||||
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
|
|
||||||
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
|
||||||
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
|
|
||||||
if (expectedStorageNum == 1 || (expectedStorageNum > 1 &&
|
|
||||||
!datanodeManager.hasClusterEverBeenMultiRack())) {
|
|
||||||
enoughRacks = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
String rackNameNew = cur.getNetworkLocation();
|
|
||||||
if (rackName == null) {
|
|
||||||
rackName = rackNameNew;
|
|
||||||
} else if (!rackName.equals(rackNameNew)) {
|
|
||||||
enoughRacks = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return enoughRacks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3964,7 +3857,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
*/
|
*/
|
||||||
boolean isNeededReplication(BlockInfo storedBlock, int current) {
|
boolean isNeededReplication(BlockInfo storedBlock, int current) {
|
||||||
int expected = getExpectedReplicaNum(storedBlock);
|
int expected = getExpectedReplicaNum(storedBlock);
|
||||||
return current < expected || !blockHasEnoughRacks(storedBlock, expected);
|
return current < expected || !isPlacementPolicySatisfied(storedBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
public short getExpectedReplicaNum(BlockInfo block) {
|
public short getExpectedReplicaNum(BlockInfo block) {
|
||||||
|
|
|
@ -29,13 +29,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This interface is used for choosing the desired number of targets
|
* This interface is used for choosing the desired number of targets
|
||||||
|
@ -103,37 +99,33 @@ public abstract class BlockPlacementPolicy {
|
||||||
* Verify if the block's placement meets requirement of placement policy,
|
* Verify if the block's placement meets requirement of placement policy,
|
||||||
* i.e. replicas are placed on no less than minRacks racks in the system.
|
* i.e. replicas are placed on no less than minRacks racks in the system.
|
||||||
*
|
*
|
||||||
* @param srcPath the full pathname of the file to be verified
|
* @param locs block with locations
|
||||||
* @param lBlk block with locations
|
|
||||||
* @param numOfReplicas replica number of file to be verified
|
* @param numOfReplicas replica number of file to be verified
|
||||||
* @return the result of verification
|
* @return the result of verification
|
||||||
*/
|
*/
|
||||||
abstract public BlockPlacementStatus verifyBlockPlacement(String srcPath,
|
abstract public BlockPlacementStatus verifyBlockPlacement(
|
||||||
LocatedBlock lBlk,
|
DatanodeInfo[] locs, int numOfReplicas);
|
||||||
int numOfReplicas);
|
|
||||||
/**
|
|
||||||
* Decide whether deleting the specified replica of the block still makes
|
|
||||||
* the block conform to the configured block placement policy.
|
|
||||||
*
|
|
||||||
* @param srcBC block collection of file to which block-to-be-deleted belongs
|
|
||||||
* @param block The block to be deleted
|
|
||||||
* @param replicationFactor The required number of replicas for this block
|
|
||||||
* @param moreThanOne The replica locations of this block that are present
|
|
||||||
* on more than one unique racks.
|
|
||||||
* @param exactlyOne Replica locations of this block that are present
|
|
||||||
* on exactly one unique racks.
|
|
||||||
* @param excessTypes The excess {@link StorageType}s according to the
|
|
||||||
* {@link BlockStoragePolicy}.
|
|
||||||
* @return the replica that is the best candidate for deletion
|
|
||||||
*/
|
|
||||||
abstract public DatanodeStorageInfo chooseReplicaToDelete(
|
|
||||||
BlockCollection srcBC,
|
|
||||||
Block block,
|
|
||||||
short replicationFactor,
|
|
||||||
Collection<DatanodeStorageInfo> moreThanOne,
|
|
||||||
Collection<DatanodeStorageInfo> exactlyOne,
|
|
||||||
List<StorageType> excessTypes);
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Select the excess replica storages for deletion based on either
|
||||||
|
* delNodehint/Excess storage types.
|
||||||
|
*
|
||||||
|
* @param candidates
|
||||||
|
* available replicas
|
||||||
|
* @param expectedNumOfReplicas
|
||||||
|
* The required number of replicas for this block
|
||||||
|
* @param excessTypes
|
||||||
|
* type of the storagepolicy
|
||||||
|
* @param addedNode
|
||||||
|
* New replica reported
|
||||||
|
* @param delNodeHint
|
||||||
|
* Hint for excess storage selection
|
||||||
|
* @return Returns the list of excess replicas chosen for deletion
|
||||||
|
*/
|
||||||
|
abstract public List<DatanodeStorageInfo> chooseReplicasToDelete(
|
||||||
|
Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas,
|
||||||
|
List<StorageType> excessTypes, DatanodeDescriptor addedNode,
|
||||||
|
DatanodeDescriptor delNodeHint);
|
||||||
/**
|
/**
|
||||||
* Used to setup a BlockPlacementPolicy object. This should be defined by
|
* Used to setup a BlockPlacementPolicy object. This should be defined by
|
||||||
* all implementations of a BlockPlacementPolicy.
|
* all implementations of a BlockPlacementPolicy.
|
||||||
|
|
|
@ -26,9 +26,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.net.NodeBase;
|
import org.apache.hadoop.net.NodeBase;
|
||||||
|
@ -859,16 +857,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BlockPlacementStatus verifyBlockPlacement(String srcPath,
|
public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
|
||||||
LocatedBlock lBlk, int numberOfReplicas) {
|
int numberOfReplicas) {
|
||||||
DatanodeInfo[] locs = lBlk.getLocations();
|
|
||||||
if (locs == null)
|
if (locs == null)
|
||||||
locs = DatanodeDescriptor.EMPTY_ARRAY;
|
locs = DatanodeDescriptor.EMPTY_ARRAY;
|
||||||
int numRacks = clusterMap.getNumOfRacks();
|
if (!clusterMap.hasClusterEverBeenMultiRack()) {
|
||||||
if(numRacks <= 1) // only one rack
|
// only one rack
|
||||||
return new BlockPlacementStatusDefault(
|
return new BlockPlacementStatusDefault(1, 1);
|
||||||
Math.min(numRacks, numberOfReplicas), numRacks);
|
}
|
||||||
int minRacks = Math.min(2, numberOfReplicas);
|
int minRacks = 2;
|
||||||
|
minRacks = Math.min(minRacks, numberOfReplicas);
|
||||||
// 1. Check that all locations are different.
|
// 1. Check that all locations are different.
|
||||||
// 2. Count locations on different racks.
|
// 2. Count locations on different racks.
|
||||||
Set<String> racks = new TreeSet<String>();
|
Set<String> racks = new TreeSet<String>();
|
||||||
|
@ -876,12 +874,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
racks.add(dn.getNetworkLocation());
|
racks.add(dn.getNetworkLocation());
|
||||||
return new BlockPlacementStatusDefault(racks.size(), minRacks);
|
return new BlockPlacementStatusDefault(racks.size(), minRacks);
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
@Override
|
* Decide whether deleting the specified replica of the block still makes
|
||||||
public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
|
* the block conform to the configured block placement policy.
|
||||||
Block block, short replicationFactor,
|
* @param replicationFactor The required number of replicas for this block
|
||||||
Collection<DatanodeStorageInfo> first,
|
* @param moreThanone The replica locations of this block that are present
|
||||||
Collection<DatanodeStorageInfo> second,
|
* on more than one unique racks.
|
||||||
|
* @param exactlyOne Replica locations of this block that are present
|
||||||
|
* on exactly one unique racks.
|
||||||
|
* @param excessTypes The excess {@link StorageType}s according to the
|
||||||
|
* {@link BlockStoragePolicy}.
|
||||||
|
*
|
||||||
|
* @return the replica that is the best candidate for deletion
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
|
||||||
|
Collection<DatanodeStorageInfo> moreThanone, Collection<DatanodeStorageInfo> exactlyOne,
|
||||||
final List<StorageType> excessTypes) {
|
final List<StorageType> excessTypes) {
|
||||||
long oldestHeartbeat =
|
long oldestHeartbeat =
|
||||||
monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
||||||
|
@ -891,7 +899,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
|
|
||||||
// Pick the node with the oldest heartbeat or with the least free space,
|
// Pick the node with the oldest heartbeat or with the least free space,
|
||||||
// if all hearbeats are within the tolerable heartbeat interval
|
// if all hearbeats are within the tolerable heartbeat interval
|
||||||
for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
|
for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) {
|
||||||
if (!excessTypes.contains(storage.getStorageType())) {
|
if (!excessTypes.contains(storage.getStorageType())) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -921,6 +929,76 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||||
return storage;
|
return storage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<DatanodeStorageInfo> chooseReplicasToDelete(
|
||||||
|
Collection<DatanodeStorageInfo> candidates,
|
||||||
|
int expectedNumOfReplicas,
|
||||||
|
List<StorageType> excessTypes,
|
||||||
|
DatanodeDescriptor addedNode,
|
||||||
|
DatanodeDescriptor delNodeHint) {
|
||||||
|
|
||||||
|
List<DatanodeStorageInfo> excessReplicas = new ArrayList<>();
|
||||||
|
|
||||||
|
final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
|
||||||
|
|
||||||
|
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
|
||||||
|
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
|
||||||
|
|
||||||
|
// split nodes into two sets
|
||||||
|
// moreThanOne contains nodes on rack with more than one replica
|
||||||
|
// exactlyOne contains the remaining nodes
|
||||||
|
splitNodesWithRack(candidates, rackMap, moreThanOne, exactlyOne);
|
||||||
|
|
||||||
|
// pick one node to delete that favors the delete hint
|
||||||
|
// otherwise pick one with least space from priSet if it is not empty
|
||||||
|
// otherwise one node with least space from remains
|
||||||
|
boolean firstOne = true;
|
||||||
|
final DatanodeStorageInfo delNodeHintStorage =
|
||||||
|
DatanodeStorageInfo.getDatanodeStorageInfo(candidates, delNodeHint);
|
||||||
|
final DatanodeStorageInfo addedNodeStorage =
|
||||||
|
DatanodeStorageInfo.getDatanodeStorageInfo(candidates, addedNode);
|
||||||
|
|
||||||
|
while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) {
|
||||||
|
final DatanodeStorageInfo cur;
|
||||||
|
if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
|
||||||
|
moreThanOne, excessTypes)) {
|
||||||
|
cur = delNodeHintStorage;
|
||||||
|
} else { // regular excessive replica removal
|
||||||
|
cur =
|
||||||
|
chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne,
|
||||||
|
excessTypes);
|
||||||
|
}
|
||||||
|
firstOne = false;
|
||||||
|
|
||||||
|
// adjust rackmap, moreThanOne, and exactlyOne
|
||||||
|
adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur);
|
||||||
|
excessReplicas.add(cur);
|
||||||
|
}
|
||||||
|
return excessReplicas;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Check if we can use delHint. */
|
||||||
|
@VisibleForTesting
|
||||||
|
static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
|
||||||
|
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
|
||||||
|
List<StorageType> excessTypes) {
|
||||||
|
if (!isFirst) {
|
||||||
|
return false; // only consider delHint for the first case
|
||||||
|
} else if (delHint == null) {
|
||||||
|
return false; // no delHint
|
||||||
|
} else if (!excessTypes.contains(delHint.getStorageType())) {
|
||||||
|
return false; // delHint storage type is not an excess type
|
||||||
|
} else {
|
||||||
|
// check if removing delHint reduces the number of racks
|
||||||
|
if (moreThan1Racks.contains(delHint)) {
|
||||||
|
return true; // delHint and some other nodes are under the same rack
|
||||||
|
} else if (added != null && !moreThan1Racks.contains(added)) {
|
||||||
|
return true; // the added node adds a new rack
|
||||||
|
}
|
||||||
|
return false; // removing delHint reduces the number of racks;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pick up replica node set for deleting replica as over-replicated.
|
* Pick up replica node set for deleting replica as over-replicated.
|
||||||
* First set contains replica nodes on rack with more than one
|
* First set contains replica nodes on rack with more than one
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.net.NodeBase;
|
import org.apache.hadoop.net.NodeBase;
|
||||||
|
|
||||||
|
@ -151,4 +152,21 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD
|
||||||
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
|
||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
|
||||||
|
int numberOfReplicas) {
|
||||||
|
if (locs == null)
|
||||||
|
locs = DatanodeDescriptor.EMPTY_ARRAY;
|
||||||
|
if (!clusterMap.hasClusterEverBeenMultiRack()) {
|
||||||
|
// only one rack
|
||||||
|
return new BlockPlacementStatusDefault(1, 1);
|
||||||
|
}
|
||||||
|
// 1. Check that all locations are different.
|
||||||
|
// 2. Count locations on different racks.
|
||||||
|
Set<String> racks = new TreeSet<String>();
|
||||||
|
for (DatanodeInfo dn : locs)
|
||||||
|
racks.add(dn.getNetworkLocation());
|
||||||
|
return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,6 @@ import java.util.Set;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -135,13 +134,13 @@ public class BlockPlacementPolicyWithUpgradeDomain extends
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BlockPlacementStatus verifyBlockPlacement(String srcPath,
|
public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
|
||||||
LocatedBlock lBlk, int numberOfReplicas) {
|
int numberOfReplicas) {
|
||||||
BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(srcPath,
|
BlockPlacementStatus defaultStatus = super.verifyBlockPlacement(locs,
|
||||||
lBlk, numberOfReplicas);
|
numberOfReplicas);
|
||||||
BlockPlacementStatusWithUpgradeDomain upgradeDomainStatus =
|
BlockPlacementStatusWithUpgradeDomain upgradeDomainStatus =
|
||||||
new BlockPlacementStatusWithUpgradeDomain(defaultStatus,
|
new BlockPlacementStatusWithUpgradeDomain(defaultStatus,
|
||||||
getUpgradeDomainsFromNodes(lBlk.getLocations()),
|
getUpgradeDomainsFromNodes(locs),
|
||||||
numberOfReplicas, upgradeDomainFactor);
|
numberOfReplicas, upgradeDomainFactor);
|
||||||
return upgradeDomainStatus;
|
return upgradeDomainStatus;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1156,14 +1156,6 @@ public class DatanodeManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return true if this cluster has ever consisted of multiple racks, even if
|
|
||||||
* it is not now a multi-rack cluster.
|
|
||||||
*/
|
|
||||||
boolean hasClusterEverBeenMultiRack() {
|
|
||||||
return hasClusterEverBeenMultiRack;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the cluster now consists of multiple racks. If it does, and this
|
* Check if the cluster now consists of multiple racks. If it does, and this
|
||||||
* is the first time it's consisted of multiple racks, then process blocks
|
* is the first time it's consisted of multiple racks, then process blocks
|
||||||
|
|
|
@ -641,8 +641,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
// count mis replicated blocks
|
// count mis replicated blocks
|
||||||
BlockPlacementStatus blockPlacementStatus = bpPolicies.getPolicy(false)
|
BlockPlacementStatus blockPlacementStatus = bpPolicies.getPolicy(
|
||||||
.verifyBlockPlacement(path, lBlk, targetFileReplication);
|
lBlk.isStriped()).verifyBlockPlacement(lBlk.getLocations(),
|
||||||
|
targetFileReplication);
|
||||||
if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
|
if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
|
||||||
res.numMisReplicatedBlocks++;
|
res.numMisReplicatedBlocks++;
|
||||||
misReplicatedPerFile++;
|
misReplicatedPerFile++;
|
||||||
|
|
|
@ -360,12 +360,12 @@ public class TestBalancer {
|
||||||
conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
|
conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
|
||||||
|
|
||||||
long[] capacities = new long[] { CAPACITY, CAPACITY };
|
long[] capacities = new long[] { CAPACITY, CAPACITY };
|
||||||
|
String[] hosts = {"host0", "host1"};
|
||||||
String[] racks = { RACK0, RACK1 };
|
String[] racks = { RACK0, RACK1 };
|
||||||
int numOfDatanodes = capacities.length;
|
int numOfDatanodes = capacities.length;
|
||||||
|
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
|
||||||
.hosts(new String[]{"localhost", "localhost"})
|
.hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
|
||||||
.racks(racks).simulatedCapacities(capacities).build();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
@ -377,7 +377,10 @@ public class TestBalancer {
|
||||||
long totalUsedSpace = totalCapacity * 8 / 10;
|
long totalUsedSpace = totalCapacity * 8 / 10;
|
||||||
InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
|
InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
|
||||||
for (int i = 0; i < favoredNodes.length; i++) {
|
for (int i = 0; i < favoredNodes.length; i++) {
|
||||||
favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
|
// DFSClient will attempt reverse lookup. In case it resolves
|
||||||
|
// "127.0.0.1" to "localhost", we manually specify the hostname.
|
||||||
|
int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
|
||||||
|
favoredNodes[i] = new InetSocketAddress(hosts[i], port);
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
|
DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
|
||||||
|
|
|
@ -824,11 +824,11 @@ public class TestBlockManager {
|
||||||
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||||
|
|
||||||
excessTypes.add(StorageType.DEFAULT);
|
excessTypes.add(StorageType.DEFAULT);
|
||||||
Assert.assertTrue(BlockManager.useDelHint(true, delHint, null,
|
Assert.assertTrue(BlockPlacementPolicyDefault.useDelHint(true, delHint,
|
||||||
moreThan1Racks, excessTypes));
|
null, moreThan1Racks, excessTypes));
|
||||||
excessTypes.remove(0);
|
excessTypes.remove(0);
|
||||||
excessTypes.add(StorageType.SSD);
|
excessTypes.add(StorageType.SSD);
|
||||||
Assert.assertFalse(BlockManager.useDelHint(true, delHint, null,
|
Assert.assertFalse(BlockPlacementPolicyDefault.useDelHint(true, delHint,
|
||||||
moreThan1Racks, excessTypes));
|
null, moreThan1Racks, excessTypes));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.LogVerificationAppender;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||||
|
@ -968,12 +970,12 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
||||||
{
|
{
|
||||||
// test returning null
|
// test returning null
|
||||||
excessTypes.add(StorageType.SSD);
|
excessTypes.add(StorageType.SSD);
|
||||||
assertNull(replicator.chooseReplicaToDelete(
|
assertNull(((BlockPlacementPolicyDefault) replicator)
|
||||||
null, null, (short)3, first, second, excessTypes));
|
.chooseReplicaToDelete((short) 3, first, second, excessTypes));
|
||||||
}
|
}
|
||||||
excessTypes.add(StorageType.DEFAULT);
|
excessTypes.add(StorageType.DEFAULT);
|
||||||
DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
|
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
|
||||||
null, null, (short)3, first, second, excessTypes);
|
.chooseReplicaToDelete((short) 3, first, second, excessTypes);
|
||||||
// Within first set, storages[1] with less free space
|
// Within first set, storages[1] with less free space
|
||||||
assertEquals(chosen, storages[1]);
|
assertEquals(chosen, storages[1]);
|
||||||
|
|
||||||
|
@ -982,11 +984,76 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
||||||
assertEquals(3, second.size());
|
assertEquals(3, second.size());
|
||||||
// Within second set, storages[5] with less free space
|
// Within second set, storages[5] with less free space
|
||||||
excessTypes.add(StorageType.DEFAULT);
|
excessTypes.add(StorageType.DEFAULT);
|
||||||
chosen = replicator.chooseReplicaToDelete(
|
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||||
null, null, (short)2, first, second, excessTypes);
|
(short)2, first, second, excessTypes);
|
||||||
assertEquals(chosen, storages[5]);
|
assertEquals(chosen, storages[5]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChooseReplicasToDelete() throws Exception {
|
||||||
|
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
|
||||||
|
nonExcess.add(storages[0]);
|
||||||
|
nonExcess.add(storages[1]);
|
||||||
|
nonExcess.add(storages[2]);
|
||||||
|
nonExcess.add(storages[3]);
|
||||||
|
List<DatanodeStorageInfo> excessReplicas = new ArrayList<>();
|
||||||
|
BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
|
||||||
|
.createDefaultSuite();
|
||||||
|
BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
|
||||||
|
|
||||||
|
// use delete hint case.
|
||||||
|
|
||||||
|
DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
|
||||||
|
List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||||
|
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||||
|
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||||
|
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
|
||||||
|
assertTrue(excessReplicas.size() > 0);
|
||||||
|
assertTrue(excessReplicas.contains(storages[0]));
|
||||||
|
|
||||||
|
// Excess type deletion
|
||||||
|
|
||||||
|
DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
|
||||||
|
"Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
|
||||||
|
"foo.com", StorageType.ARCHIVE);
|
||||||
|
nonExcess.add(excessStorage);
|
||||||
|
excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||||
|
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||||
|
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||||
|
excessTypes, storages[3].getDatanodeDescriptor(), null);
|
||||||
|
assertTrue(excessReplicas.contains(excessStorage));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUseDelHint() throws Exception {
|
||||||
|
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||||
|
excessTypes.add(StorageType.ARCHIVE);
|
||||||
|
// only consider delHint for the first case
|
||||||
|
assertFalse(BlockPlacementPolicyDefault.useDelHint(false, null, null, null,
|
||||||
|
null));
|
||||||
|
// no delHint
|
||||||
|
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, null, null, null,
|
||||||
|
null));
|
||||||
|
// delHint storage type is not an excess type
|
||||||
|
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null,
|
||||||
|
null, excessTypes));
|
||||||
|
// check if removing delHint reduces the number of racks
|
||||||
|
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||||
|
chosenNodes.add(storages[0]);
|
||||||
|
chosenNodes.add(storages[2]);
|
||||||
|
excessTypes.add(StorageType.DEFAULT);
|
||||||
|
assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null,
|
||||||
|
chosenNodes, excessTypes));
|
||||||
|
// the added node adds a new rack
|
||||||
|
assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[3],
|
||||||
|
storages[5], chosenNodes, excessTypes));
|
||||||
|
// removing delHint reduces the number of racks;
|
||||||
|
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3],
|
||||||
|
storages[0], chosenNodes, excessTypes));
|
||||||
|
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], null,
|
||||||
|
chosenNodes, excessTypes));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This testcase tests whether the default value returned by
|
* This testcase tests whether the default value returned by
|
||||||
* DFSUtil.getInvalidateWorkPctPerIteration() is positive,
|
* DFSUtil.getInvalidateWorkPctPerIteration() is positive,
|
||||||
|
|
|
@ -533,8 +533,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
||||||
assertEquals(1, second.size());
|
assertEquals(1, second.size());
|
||||||
List<StorageType> excessTypes = new ArrayList<>();
|
List<StorageType> excessTypes = new ArrayList<>();
|
||||||
excessTypes.add(StorageType.DEFAULT);
|
excessTypes.add(StorageType.DEFAULT);
|
||||||
DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
|
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
|
||||||
null, null, (short)3, first, second, excessTypes);
|
.chooseReplicaToDelete((short) 3, first, second, excessTypes);
|
||||||
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
|
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
|
||||||
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
|
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
|
||||||
// but dataNodes[1] is chosen as less free space
|
// but dataNodes[1] is chosen as less free space
|
||||||
|
@ -546,8 +546,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
||||||
// Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
|
// Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
|
||||||
// as less free space
|
// as less free space
|
||||||
excessTypes.add(StorageType.DEFAULT);
|
excessTypes.add(StorageType.DEFAULT);
|
||||||
chosen = replicator.chooseReplicaToDelete(
|
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||||
null, null, (short)2, first, second, excessTypes);
|
(short) 2, first, second, excessTypes);
|
||||||
assertEquals(chosen, storages[2]);
|
assertEquals(chosen, storages[2]);
|
||||||
|
|
||||||
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
|
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
|
||||||
|
@ -555,8 +555,8 @@ public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTes
|
||||||
assertEquals(2, second.size());
|
assertEquals(2, second.size());
|
||||||
// Within second set, dataNodes[5] with less free space
|
// Within second set, dataNodes[5] with less free space
|
||||||
excessTypes.add(StorageType.DEFAULT);
|
excessTypes.add(StorageType.DEFAULT);
|
||||||
chosen = replicator.chooseReplicaToDelete(
|
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||||
null, null, (short)1, first, second, excessTypes);
|
(short) 1, first, second, excessTypes);
|
||||||
assertEquals(chosen, storages[5]);
|
assertEquals(chosen, storages[5]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -208,7 +208,7 @@ public class TestReplicationPolicyWithUpgradeDomain
|
||||||
second.add(storages[8]);
|
second.add(storages[8]);
|
||||||
DatanodeStorageInfo chosenStorage =
|
DatanodeStorageInfo chosenStorage =
|
||||||
upgradeDomainPolicy.chooseReplicaToDelete(
|
upgradeDomainPolicy.chooseReplicaToDelete(
|
||||||
null, null, (short)3, first, second, excessTypes);
|
(short)3, first, second, excessTypes);
|
||||||
assertEquals(chosenStorage, storages[1]);
|
assertEquals(chosenStorage, storages[1]);
|
||||||
first.clear();
|
first.clear();
|
||||||
second.clear();
|
second.clear();
|
||||||
|
@ -219,7 +219,7 @@ public class TestReplicationPolicyWithUpgradeDomain
|
||||||
first.add(storages[4]);
|
first.add(storages[4]);
|
||||||
first.add(storages[5]);
|
first.add(storages[5]);
|
||||||
chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete(
|
chosenStorage = upgradeDomainPolicy.chooseReplicaToDelete(
|
||||||
null, null, (short)3, first, second, excessTypes);
|
(short)3, first, second, excessTypes);
|
||||||
assertTrue(chosenStorage.equals(storages[1]) ||
|
assertTrue(chosenStorage.equals(storages[1]) ||
|
||||||
chosenStorage.equals(storages[4]));
|
chosenStorage.equals(storages[4]));
|
||||||
}
|
}
|
||||||
|
@ -265,7 +265,8 @@ public class TestReplicationPolicyWithUpgradeDomain
|
||||||
set.add(storages[4]);
|
set.add(storages[4]);
|
||||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
assertFalse(status.isPlacementPolicySatisfied());
|
assertFalse(status.isPlacementPolicySatisfied());
|
||||||
|
|
||||||
// 3 upgrade domains (enough), 2 racks (enough)
|
// 3 upgrade domains (enough), 2 racks (enough)
|
||||||
|
@ -275,7 +276,8 @@ public class TestReplicationPolicyWithUpgradeDomain
|
||||||
set.add(storages[5]);
|
set.add(storages[5]);
|
||||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
assertTrue(status.isPlacementPolicySatisfied());
|
assertTrue(status.isPlacementPolicySatisfied());
|
||||||
|
|
||||||
// 3 upgrade domains (enough), 1 rack (not enough)
|
// 3 upgrade domains (enough), 1 rack (not enough)
|
||||||
|
@ -285,7 +287,8 @@ public class TestReplicationPolicyWithUpgradeDomain
|
||||||
set.add(storages[2]);
|
set.add(storages[2]);
|
||||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
assertFalse(status.isPlacementPolicySatisfied());
|
assertFalse(status.isPlacementPolicySatisfied());
|
||||||
assertFalse(status.getErrorDescription().contains("upgrade domain"));
|
assertFalse(status.getErrorDescription().contains("upgrade domain"));
|
||||||
|
|
||||||
|
@ -296,7 +299,8 @@ public class TestReplicationPolicyWithUpgradeDomain
|
||||||
set.add(storages[8]);
|
set.add(storages[8]);
|
||||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
assertFalse(status.isPlacementPolicySatisfied());
|
assertFalse(status.isPlacementPolicySatisfied());
|
||||||
assertTrue(status.getErrorDescription().contains("upgrade domain"));
|
assertTrue(status.getErrorDescription().contains("upgrade domain"));
|
||||||
|
|
||||||
|
@ -307,7 +311,8 @@ public class TestReplicationPolicyWithUpgradeDomain
|
||||||
set.add(storages[8]);
|
set.add(storages[8]);
|
||||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
assertTrue(status.isPlacementPolicySatisfied());
|
assertTrue(status.isPlacementPolicySatisfied());
|
||||||
|
|
||||||
|
|
||||||
|
@ -319,7 +324,8 @@ public class TestReplicationPolicyWithUpgradeDomain
|
||||||
set.add(storages[8]);
|
set.add(storages[8]);
|
||||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
assertTrue(status.isPlacementPolicySatisfied());
|
assertTrue(status.isPlacementPolicySatisfied());
|
||||||
|
|
||||||
// 2 upgrade domains (not enough), 3 racks (enough), 4 replicas
|
// 2 upgrade domains (not enough), 3 racks (enough), 4 replicas
|
||||||
|
@ -330,7 +336,8 @@ public class TestReplicationPolicyWithUpgradeDomain
|
||||||
set.add(storages[8]);
|
set.add(storages[8]);
|
||||||
locatedBlock = BlockManager.newLocatedBlock(b,
|
locatedBlock = BlockManager.newLocatedBlock(b,
|
||||||
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
|
||||||
status = replicator.verifyBlockPlacement("", locatedBlock, set.size());
|
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
|
||||||
|
set.size());
|
||||||
assertFalse(status.isPlacementPolicySatisfied());
|
assertFalse(status.isPlacementPolicySatisfied());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,10 +40,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||||
|
@ -631,10 +629,8 @@ public class TestDNFencing {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection inode,
|
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
|
||||||
Block block, short replicationFactor,
|
Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> second,
|
||||||
Collection<DatanodeStorageInfo> first,
|
|
||||||
Collection<DatanodeStorageInfo> second,
|
|
||||||
List<StorageType> excessTypes) {
|
List<StorageType> excessTypes) {
|
||||||
|
|
||||||
Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
|
Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
|
||||||
|
|
Loading…
Reference in New Issue