HDFS-8647. Abstract BlockManager's rack policy into BlockPlacementPolicy. Contributed by Brahma Reddy Battula. Branch-2.7 patch contributed by Xiao Chen.
Change-Id: I1b32627bc2a2f30f5debeaba7663fb2777958079
This commit is contained in:
parent
7404506111
commit
1ae8a8342e
|
@ -53,9 +53,9 @@ import com.google.common.collect.Lists;
|
|||
public class NetworkTopology {
|
||||
public final static String DEFAULT_RACK = "/default-rack";
|
||||
public final static int DEFAULT_HOST_LEVEL = 2;
|
||||
public static final Log LOG =
|
||||
public static final Log LOG =
|
||||
LogFactory.getLog(NetworkTopology.class);
|
||||
|
||||
|
||||
public static class InvalidTopologyException extends RuntimeException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
public InvalidTopologyException(String msg) {
|
||||
|
@ -379,6 +379,13 @@ public class NetworkTopology {
|
|||
private int depthOfAllLeaves = -1;
|
||||
/** rack counter */
|
||||
protected int numOfRacks = 0;
|
||||
|
||||
/**
|
||||
* Whether or not this cluster has ever consisted of more than 1 rack,
|
||||
* according to the NetworkTopology.
|
||||
*/
|
||||
private boolean clusterEverBeenMultiRack = false;
|
||||
|
||||
/** the lock used to manage access */
|
||||
protected ReadWriteLock netlock = new ReentrantReadWriteLock();
|
||||
|
||||
|
@ -418,7 +425,7 @@ public class NetworkTopology {
|
|||
if (clusterMap.add(node)) {
|
||||
LOG.info("Adding a new node: "+NodeBase.getPath(node));
|
||||
if (rack == null) {
|
||||
numOfRacks++;
|
||||
incrementRacks();
|
||||
}
|
||||
if (!(node instanceof InnerNode)) {
|
||||
if (depthOfAllLeaves == -1) {
|
||||
|
@ -433,7 +440,14 @@ public class NetworkTopology {
|
|||
netlock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void incrementRacks() {
|
||||
numOfRacks++;
|
||||
if (!clusterEverBeenMultiRack && numOfRacks > 1) {
|
||||
clusterEverBeenMultiRack = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a reference to the node given its string representation.
|
||||
* Default implementation delegates to {@link #getNode(String)}.
|
||||
|
@ -541,10 +555,18 @@ public class NetworkTopology {
|
|||
netlock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return true if this cluster has ever consisted of multiple racks, even if
|
||||
* it is not now a multi-rack cluster.
|
||||
*/
|
||||
public boolean hasClusterEverBeenMultiRack() {
|
||||
return clusterEverBeenMultiRack;
|
||||
}
|
||||
|
||||
/** Given a string representation of a rack for a specific network
|
||||
* location
|
||||
*
|
||||
*
|
||||
* To be overridden in subclasses for specific NetworkTopology
|
||||
* implementations, as alternative to overriding the full
|
||||
* {@link #getRack(String)} method.
|
||||
|
|
|
@ -205,7 +205,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
|
|||
LOG.info("Adding a new node: " + NodeBase.getPath(node));
|
||||
if (rack == null) {
|
||||
// We only track rack number here
|
||||
numOfRacks++;
|
||||
incrementRacks();
|
||||
}
|
||||
}
|
||||
if(LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -8,6 +8,9 @@ Release 2.7.3 - UNRELEASED
|
|||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-8647. Abstract BlockManager's rack policy into BlockPlacementPolicy.
|
||||
(Brahma Reddy Battula via mingma)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -1364,7 +1364,7 @@ public class BlockManager {
|
|||
|
||||
if (numEffectiveReplicas >= requiredReplication) {
|
||||
if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
||||
(blockHasEnoughRacks(block)) ) {
|
||||
(isPlacementPolicySatisfied(block)) ) {
|
||||
neededReplications.remove(block, priority); // remove from neededReplications
|
||||
neededReplications.decrementReplicationIndex(priority);
|
||||
blockLog.info("BLOCK* Removing {} from neededReplications as" +
|
||||
|
@ -1435,7 +1435,7 @@ public class BlockManager {
|
|||
|
||||
if (numEffectiveReplicas >= requiredReplication) {
|
||||
if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
||||
(blockHasEnoughRacks(block)) ) {
|
||||
(isPlacementPolicySatisfied(block)) ) {
|
||||
neededReplications.remove(block, priority); // remove from neededReplications
|
||||
neededReplications.decrementReplicationIndex(priority);
|
||||
rw.targets = null;
|
||||
|
@ -1446,7 +1446,7 @@ public class BlockManager {
|
|||
}
|
||||
|
||||
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
|
||||
(!blockHasEnoughRacks(block)) ) {
|
||||
(!isPlacementPolicySatisfied(block)) ) {
|
||||
if (rw.srcNode.getNetworkLocation().equals(
|
||||
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
|
||||
//No use continuing, unless a new rack in this case
|
||||
|
@ -2902,107 +2902,46 @@ public class BlockManager {
|
|||
}
|
||||
}
|
||||
chooseExcessReplicates(nonExcess, block, replication,
|
||||
addedNode, delNodeHint, blockplacement);
|
||||
addedNode, delNodeHint);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* We want "replication" replicates for the block, but we now have too many.
|
||||
* In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
|
||||
*
|
||||
* srcNodes.size() - dstNodes.size() == replication
|
||||
*
|
||||
* We pick node that make sure that replicas are spread across racks and
|
||||
* also try hard to pick one with least free space.
|
||||
* The algorithm is first to pick a node with least free space from nodes
|
||||
* that are on a rack holding more than one replicas of the block.
|
||||
* So removing such a replica won't remove a rack.
|
||||
* If no such a node is available,
|
||||
* then pick a node with least free space
|
||||
*/
|
||||
private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess,
|
||||
Block b, short replication,
|
||||
DatanodeDescriptor addedNode,
|
||||
DatanodeDescriptor delNodeHint,
|
||||
BlockPlacementPolicy replicator) {
|
||||
private void chooseExcessReplicates(
|
||||
final Collection<DatanodeStorageInfo> nonExcess,
|
||||
Block b, short replication,
|
||||
DatanodeDescriptor addedNode,
|
||||
DatanodeDescriptor delNodeHint) {
|
||||
assert namesystem.hasWriteLock();
|
||||
// first form a rack to datanodes map and
|
||||
BlockCollection bc = getBlockCollection(b);
|
||||
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
|
||||
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
|
||||
bc.getStoragePolicyID());
|
||||
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
||||
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
|
||||
|
||||
final Map<String, List<DatanodeStorageInfo>> rackMap
|
||||
= new HashMap<String, List<DatanodeStorageInfo>>();
|
||||
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
|
||||
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
|
||||
|
||||
// split nodes into two sets
|
||||
// moreThanOne contains nodes on rack with more than one replica
|
||||
// exactlyOne contains the remaining nodes
|
||||
replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
|
||||
|
||||
// pick one node to delete that favors the delete hint
|
||||
// otherwise pick one with least space from priSet if it is not empty
|
||||
// otherwise one node with least space from remains
|
||||
boolean firstOne = true;
|
||||
final DatanodeStorageInfo delNodeHintStorage
|
||||
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
|
||||
final DatanodeStorageInfo addedNodeStorage
|
||||
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
|
||||
while (nonExcess.size() - replication > 0) {
|
||||
final DatanodeStorageInfo cur;
|
||||
if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
|
||||
moreThanOne, excessTypes)) {
|
||||
cur = delNodeHintStorage;
|
||||
} else { // regular excessive replica removal
|
||||
cur = replicator.chooseReplicaToDelete(bc, b, replication,
|
||||
moreThanOne, exactlyOne, excessTypes);
|
||||
}
|
||||
firstOne = false;
|
||||
|
||||
// adjust rackmap, moreThanOne, and exactlyOne
|
||||
replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
|
||||
exactlyOne, cur);
|
||||
|
||||
nonExcess.remove(cur);
|
||||
addToExcessReplicate(cur.getDatanodeDescriptor(), b);
|
||||
|
||||
//
|
||||
// The 'excessblocks' tracks blocks until we get confirmation
|
||||
// that the datanode has deleted them; the only way we remove them
|
||||
// is when we get a "removeBlock" message.
|
||||
//
|
||||
// The 'invalidate' list is used to inform the datanode the block
|
||||
// should be deleted. Items are removed from the invalidate list
|
||||
// upon giving instructions to the namenode.
|
||||
//
|
||||
addToInvalidates(b, cur.getDatanodeDescriptor());
|
||||
blockLog.info("BLOCK* chooseExcessReplicates: "
|
||||
+"({}, {}) is added to invalidated blocks set", cur, b);
|
||||
List<DatanodeStorageInfo> replicasToDelete = blockplacement
|
||||
.chooseReplicasToDelete(nonExcess, replication, excessTypes,
|
||||
addedNode, delNodeHint);
|
||||
for (DatanodeStorageInfo choosenReplica : replicasToDelete) {
|
||||
processChosenExcessReplica(nonExcess, choosenReplica, b);
|
||||
}
|
||||
}
|
||||
|
||||
/** Check if we can use delHint */
|
||||
static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
|
||||
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
|
||||
List<StorageType> excessTypes) {
|
||||
if (!isFirst) {
|
||||
return false; // only consider delHint for the first case
|
||||
} else if (delHint == null) {
|
||||
return false; // no delHint
|
||||
} else if (!excessTypes.contains(delHint.getStorageType())) {
|
||||
return false; // delHint storage type is not an excess type
|
||||
} else {
|
||||
// check if removing delHint reduces the number of racks
|
||||
if (moreThan1Racks.contains(delHint)) {
|
||||
return true; // delHint and some other nodes are under the same rack
|
||||
} else if (added != null && !moreThan1Racks.contains(added)) {
|
||||
return true; // the added node adds a new rack
|
||||
}
|
||||
return false; // removing delHint reduces the number of racks;
|
||||
}
|
||||
private void processChosenExcessReplica(
|
||||
final Collection<DatanodeStorageInfo> nonExcess,
|
||||
final DatanodeStorageInfo chosen, Block b) {
|
||||
nonExcess.remove(chosen);
|
||||
addToExcessReplicate(chosen.getDatanodeDescriptor(), b);
|
||||
//
|
||||
// The 'excessblocks' tracks blocks until we get confirmation
|
||||
// that the datanode has deleted them; the only way we remove them
|
||||
// is when we get a "removeBlock" message.
|
||||
//
|
||||
// The 'invalidate' list is used to inform the datanode the block
|
||||
// should be deleted. Items are removed from the invalidate list
|
||||
// upon giving instructions to the datanodes.
|
||||
//
|
||||
addToInvalidates(b, chosen.getDatanodeDescriptor());
|
||||
blockLog.debug("BLOCK* chooseExcessReplicates: "
|
||||
+ "({}, {}) is added to invalidated blocks set", chosen, b);
|
||||
}
|
||||
|
||||
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
|
||||
|
@ -3484,33 +3423,20 @@ public class BlockManager {
|
|||
return toInvalidate.size();
|
||||
}
|
||||
|
||||
boolean blockHasEnoughRacks(Block b) {
|
||||
boolean enoughRacks = false;;
|
||||
Collection<DatanodeDescriptor> corruptNodes =
|
||||
corruptReplicas.getNodes(b);
|
||||
int numExpectedReplicas = getReplication(b);
|
||||
String rackName = null;
|
||||
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
|
||||
boolean isPlacementPolicySatisfied(Block b) {
|
||||
List<DatanodeDescriptor> liveNodes = new ArrayList<>();
|
||||
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
|
||||
.getNodes(b);
|
||||
for (DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
|
||||
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
|
||||
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
|
||||
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
|
||||
if (numExpectedReplicas == 1 ||
|
||||
(numExpectedReplicas > 1 &&
|
||||
!datanodeManager.hasClusterEverBeenMultiRack())) {
|
||||
enoughRacks = true;
|
||||
break;
|
||||
}
|
||||
String rackNameNew = cur.getNetworkLocation();
|
||||
if (rackName == null) {
|
||||
rackName = rackNameNew;
|
||||
} else if (!rackName.equals(rackNameNew)) {
|
||||
enoughRacks = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()
|
||||
&& ((corruptNodes == null) || !corruptNodes.contains(cur))) {
|
||||
liveNodes.add(cur);
|
||||
}
|
||||
}
|
||||
return enoughRacks;
|
||||
DatanodeInfo[] locs = liveNodes.toArray(new DatanodeInfo[liveNodes.size()]);
|
||||
return blockplacement.verifyBlockPlacement(locs,
|
||||
getReplication(b)).isPlacementPolicySatisfied();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3518,7 +3444,7 @@ public class BlockManager {
|
|||
* or if it does not have enough racks.
|
||||
*/
|
||||
boolean isNeededReplication(Block b, int expected, int current) {
|
||||
return current < expected || !blockHasEnoughRacks(b);
|
||||
return current < expected || !isPlacementPolicySatisfied(b);
|
||||
}
|
||||
|
||||
public long getMissingBlocksCount() {
|
||||
|
|
|
@ -30,9 +30,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
@ -103,37 +101,33 @@ public abstract class BlockPlacementPolicy {
|
|||
* Verify if the block's placement meets requirement of placement policy,
|
||||
* i.e. replicas are placed on no less than minRacks racks in the system.
|
||||
*
|
||||
* @param srcPath the full pathname of the file to be verified
|
||||
* @param lBlk block with locations
|
||||
* @param locs block with locations
|
||||
* @param numOfReplicas replica number of file to be verified
|
||||
* @return the result of verification
|
||||
*/
|
||||
abstract public BlockPlacementStatus verifyBlockPlacement(String srcPath,
|
||||
LocatedBlock lBlk,
|
||||
int numOfReplicas);
|
||||
/**
|
||||
* Decide whether deleting the specified replica of the block still makes
|
||||
* the block conform to the configured block placement policy.
|
||||
*
|
||||
* @param srcBC block collection of file to which block-to-be-deleted belongs
|
||||
* @param block The block to be deleted
|
||||
* @param replicationFactor The required number of replicas for this block
|
||||
* @param moreThanOne The replica locations of this block that are present
|
||||
* on more than one unique racks.
|
||||
* @param exactlyOne Replica locations of this block that are present
|
||||
* on exactly one unique racks.
|
||||
* @param excessTypes The excess {@link StorageType}s according to the
|
||||
* {@link BlockStoragePolicy}.
|
||||
* @return the replica that is the best candidate for deletion
|
||||
*/
|
||||
abstract public DatanodeStorageInfo chooseReplicaToDelete(
|
||||
BlockCollection srcBC,
|
||||
Block block,
|
||||
short replicationFactor,
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
List<StorageType> excessTypes);
|
||||
abstract public BlockPlacementStatus verifyBlockPlacement(
|
||||
DatanodeInfo[] locs, int numOfReplicas);
|
||||
|
||||
/**
|
||||
* Select the excess replica storages for deletion based on either
|
||||
* delNodehint/Excess storage types.
|
||||
*
|
||||
* @param candidates
|
||||
* available replicas
|
||||
* @param expectedNumOfReplicas
|
||||
* The required number of replicas for this block
|
||||
* @param excessTypes
|
||||
* type of the storagepolicy
|
||||
* @param addedNode
|
||||
* New replica reported
|
||||
* @param delNodeHint
|
||||
* Hint for excess storage selection
|
||||
* @return Returns the list of excess replicas chosen for deletion
|
||||
*/
|
||||
abstract public List<DatanodeStorageInfo> chooseReplicasToDelete(
|
||||
Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas,
|
||||
List<StorageType> excessTypes, DatanodeDescriptor addedNode,
|
||||
DatanodeDescriptor delNodeHint);
|
||||
/**
|
||||
* Used to setup a BlockPlacementPolicy object. This should be defined by
|
||||
* all implementations of a BlockPlacementPolicy.
|
||||
|
|
|
@ -27,10 +27,8 @@ import org.apache.hadoop.fs.StorageType;
|
|||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
|
@ -869,16 +867,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
}
|
||||
|
||||
@Override
|
||||
public BlockPlacementStatus verifyBlockPlacement(String srcPath,
|
||||
LocatedBlock lBlk, int numberOfReplicas) {
|
||||
DatanodeInfo[] locs = lBlk.getLocations();
|
||||
public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs,
|
||||
int numberOfReplicas) {
|
||||
if (locs == null)
|
||||
locs = DatanodeDescriptor.EMPTY_ARRAY;
|
||||
int numRacks = clusterMap.getNumOfRacks();
|
||||
if(numRacks <= 1) // only one rack
|
||||
return new BlockPlacementStatusDefault(
|
||||
Math.min(numRacks, numberOfReplicas), numRacks);
|
||||
int minRacks = Math.min(2, numberOfReplicas);
|
||||
if (!clusterMap.hasClusterEverBeenMultiRack()) {
|
||||
// only one rack
|
||||
return new BlockPlacementStatusDefault(1, 1);
|
||||
}
|
||||
int minRacks = 2;
|
||||
minRacks = Math.min(minRacks, numberOfReplicas);
|
||||
// 1. Check that all locations are different.
|
||||
// 2. Count locations on different racks.
|
||||
Set<String> racks = new TreeSet<String>();
|
||||
|
@ -886,12 +884,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
racks.add(dn.getNetworkLocation());
|
||||
return new BlockPlacementStatusDefault(racks.size(), minRacks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
|
||||
Block block, short replicationFactor,
|
||||
Collection<DatanodeStorageInfo> first,
|
||||
Collection<DatanodeStorageInfo> second,
|
||||
/**
|
||||
* Decide whether deleting the specified replica of the block still makes
|
||||
* the block conform to the configured block placement policy.
|
||||
* @param replicationFactor The required number of replicas for this block
|
||||
* @param moreThanone The replica locations of this block that are present
|
||||
* on more than one unique racks.
|
||||
* @param exactlyOne Replica locations of this block that are present
|
||||
* on exactly one unique racks.
|
||||
* @param excessTypes The excess {@link StorageType}s according to the
|
||||
* {@link BlockStoragePolicy}.
|
||||
*
|
||||
* @return the replica that is the best candidate for deletion
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
|
||||
Collection<DatanodeStorageInfo> moreThanone, Collection<DatanodeStorageInfo> exactlyOne,
|
||||
final List<StorageType> excessTypes) {
|
||||
long oldestHeartbeat =
|
||||
monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
||||
|
@ -901,7 +909,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
|
||||
// Pick the node with the oldest heartbeat or with the least free space,
|
||||
// if all hearbeats are within the tolerable heartbeat interval
|
||||
for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
|
||||
for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) {
|
||||
if (!excessTypes.contains(storage.getStorageType())) {
|
||||
continue;
|
||||
}
|
||||
|
@ -931,6 +939,76 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
return storage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DatanodeStorageInfo> chooseReplicasToDelete(
|
||||
Collection<DatanodeStorageInfo> candidates,
|
||||
int expectedNumOfReplicas,
|
||||
List<StorageType> excessTypes,
|
||||
DatanodeDescriptor addedNode,
|
||||
DatanodeDescriptor delNodeHint) {
|
||||
|
||||
List<DatanodeStorageInfo> excessReplicas = new ArrayList<>();
|
||||
|
||||
final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
|
||||
|
||||
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
|
||||
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
|
||||
|
||||
// split nodes into two sets
|
||||
// moreThanOne contains nodes on rack with more than one replica
|
||||
// exactlyOne contains the remaining nodes
|
||||
splitNodesWithRack(candidates, rackMap, moreThanOne, exactlyOne);
|
||||
|
||||
// pick one node to delete that favors the delete hint
|
||||
// otherwise pick one with least space from priSet if it is not empty
|
||||
// otherwise one node with least space from remains
|
||||
boolean firstOne = true;
|
||||
final DatanodeStorageInfo delNodeHintStorage =
|
||||
DatanodeStorageInfo.getDatanodeStorageInfo(candidates, delNodeHint);
|
||||
final DatanodeStorageInfo addedNodeStorage =
|
||||
DatanodeStorageInfo.getDatanodeStorageInfo(candidates, addedNode);
|
||||
|
||||
while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) {
|
||||
final DatanodeStorageInfo cur;
|
||||
if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
|
||||
moreThanOne, excessTypes)) {
|
||||
cur = delNodeHintStorage;
|
||||
} else { // regular excessive replica removal
|
||||
cur =
|
||||
chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne,
|
||||
excessTypes);
|
||||
}
|
||||
firstOne = false;
|
||||
|
||||
// adjust rackmap, moreThanOne, and exactlyOne
|
||||
adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur);
|
||||
excessReplicas.add(cur);
|
||||
}
|
||||
return excessReplicas;
|
||||
}
|
||||
|
||||
/** Check if we can use delHint. */
|
||||
@VisibleForTesting
|
||||
static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
|
||||
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
|
||||
List<StorageType> excessTypes) {
|
||||
if (!isFirst) {
|
||||
return false; // only consider delHint for the first case
|
||||
} else if (delHint == null) {
|
||||
return false; // no delHint
|
||||
} else if (!excessTypes.contains(delHint.getStorageType())) {
|
||||
return false; // delHint storage type is not an excess type
|
||||
} else {
|
||||
// check if removing delHint reduces the number of racks
|
||||
if (moreThan1Racks.contains(delHint)) {
|
||||
return true; // delHint and some other nodes are under the same rack
|
||||
} else if (added != null && !moreThan1Racks.contains(added)) {
|
||||
return true; // the added node adds a new rack
|
||||
}
|
||||
return false; // removing delHint reduces the number of racks;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick up replica node set for deleting replica as over-replicated.
|
||||
* First set contains replica nodes on rack with more than one
|
||||
|
|
|
@ -1183,14 +1183,6 @@ public class DatanodeManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if this cluster has ever consisted of multiple racks, even if
|
||||
* it is not now a multi-rack cluster.
|
||||
*/
|
||||
boolean hasClusterEverBeenMultiRack() {
|
||||
return hasClusterEverBeenMultiRack;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the cluster now consists of multiple racks. If it does, and this
|
||||
* is the first time it's consisted of multiple racks, then process blocks
|
||||
|
|
|
@ -542,7 +542,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
|||
|
||||
// count mis replicated blocks
|
||||
BlockPlacementStatus blockPlacementStatus = bpPolicy
|
||||
.verifyBlockPlacement(path, lBlk, targetFileReplication);
|
||||
.verifyBlockPlacement(lBlk.getLocations(), targetFileReplication);
|
||||
if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
|
||||
res.numMisReplicatedBlocks++;
|
||||
misReplicatedPerFile++;
|
||||
|
|
|
@ -323,12 +323,12 @@ public class TestBalancer {
|
|||
conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
|
||||
|
||||
long[] capacities = new long[] { CAPACITY, CAPACITY };
|
||||
String[] hosts = {"host0", "host1"};
|
||||
String[] racks = { RACK0, RACK1 };
|
||||
int numOfDatanodes = capacities.length;
|
||||
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
|
||||
.hosts(new String[]{"localhost", "localhost"})
|
||||
.racks(racks).simulatedCapacities(capacities).build();
|
||||
.hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
|
||||
|
||||
try {
|
||||
cluster.waitActive();
|
||||
|
@ -340,7 +340,10 @@ public class TestBalancer {
|
|||
long totalUsedSpace = totalCapacity * 8 / 10;
|
||||
InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
|
||||
for (int i = 0; i < favoredNodes.length; i++) {
|
||||
favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
|
||||
// DFSClient will attempt reverse lookup. In case it resolves
|
||||
// "127.0.0.1" to "localhost", we manually specify the hostname.
|
||||
int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
|
||||
favoredNodes[i] = new InetSocketAddress(hosts[i], port);
|
||||
}
|
||||
|
||||
DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
|
||||
|
|
|
@ -809,16 +809,16 @@ public class TestBlockManager {
|
|||
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
Assert.assertTrue(BlockManager.useDelHint(true, delHint, null,
|
||||
moreThan1Racks, excessTypes));
|
||||
Assert.assertTrue(BlockPlacementPolicyDefault.useDelHint(true, delHint,
|
||||
null, moreThan1Racks, excessTypes));
|
||||
excessTypes.remove(0);
|
||||
excessTypes.add(StorageType.SSD);
|
||||
Assert.assertFalse(BlockManager.useDelHint(true, delHint, null,
|
||||
moreThan1Racks, excessTypes));
|
||||
Assert.assertFalse(BlockPlacementPolicyDefault.useDelHint(true, delHint,
|
||||
null, moreThan1Racks, excessTypes));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link BlockManager#blockHasEnoughRacks(BlockInfo)} should return false
|
||||
* {@link BlockManager#isPlacementPolicySatisfied(Block)} should return false
|
||||
* if all the replicas are on the same rack and shouldn't be dependent on
|
||||
* CommonConfigurationKeysPublic.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY
|
||||
* @throws Exception
|
||||
|
@ -837,7 +837,7 @@ public class TestBlockManager {
|
|||
BlockInfoContiguous blockInfo = addBlockOnNodes(1, rackA);
|
||||
// Since the network toppolgy is multi-rack, the blockHasEnoughRacks
|
||||
// should return false.
|
||||
assertFalse("Replicas for block is not stored on enough racks",
|
||||
bm.blockHasEnoughRacks(blockInfo));
|
||||
assertFalse("Replicas for block is not stored on enough racks",
|
||||
bm.isPlacementPolicySatisfied(blockInfo));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
|
|||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -50,6 +51,7 @@ import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
|||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
|
@ -1040,12 +1042,12 @@ public class TestReplicationPolicy {
|
|||
{
|
||||
// test returning null
|
||||
excessTypes.add(StorageType.SSD);
|
||||
assertNull(replicator.chooseReplicaToDelete(
|
||||
null, null, (short)3, first, second, excessTypes));
|
||||
assertNull(((BlockPlacementPolicyDefault) replicator)
|
||||
.chooseReplicaToDelete((short) 3, first, second, excessTypes));
|
||||
}
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)3, first, second, excessTypes);
|
||||
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
|
||||
.chooseReplicaToDelete((short) 3, first, second, excessTypes);
|
||||
// Within first set, storages[1] with less free space
|
||||
assertEquals(chosen, storages[1]);
|
||||
|
||||
|
@ -1054,11 +1056,76 @@ public class TestReplicationPolicy {
|
|||
assertEquals(3, second.size());
|
||||
// Within second set, storages[5] with less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)2, first, second, excessTypes);
|
||||
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||
(short)2, first, second, excessTypes);
|
||||
assertEquals(chosen, storages[5]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChooseReplicasToDelete() throws Exception {
|
||||
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
|
||||
nonExcess.add(storages[0]);
|
||||
nonExcess.add(storages[1]);
|
||||
nonExcess.add(storages[2]);
|
||||
nonExcess.add(storages[3]);
|
||||
List<DatanodeStorageInfo> excessReplicas = new ArrayList<>();
|
||||
BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
|
||||
.createDefaultSuite();
|
||||
BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
|
||||
|
||||
// use delete hint case.
|
||||
|
||||
DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
|
||||
List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
|
||||
assertTrue(excessReplicas.size() > 0);
|
||||
assertTrue(excessReplicas.contains(storages[0]));
|
||||
|
||||
// Excess type deletion
|
||||
|
||||
DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
|
||||
"Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
|
||||
"foo.com", StorageType.ARCHIVE);
|
||||
nonExcess.add(excessStorage);
|
||||
excessTypes = storagePolicy.chooseExcess((short) 3,
|
||||
DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, 3,
|
||||
excessTypes, storages[3].getDatanodeDescriptor(), null);
|
||||
assertTrue(excessReplicas.contains(excessStorage));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUseDelHint() throws Exception {
|
||||
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||
excessTypes.add(StorageType.ARCHIVE);
|
||||
// only consider delHint for the first case
|
||||
assertFalse(BlockPlacementPolicyDefault.useDelHint(false, null, null, null,
|
||||
null));
|
||||
// no delHint
|
||||
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, null, null, null,
|
||||
null));
|
||||
// delHint storage type is not an excess type
|
||||
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null,
|
||||
null, excessTypes));
|
||||
// check if removing delHint reduces the number of racks
|
||||
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
|
||||
chosenNodes.add(storages[0]);
|
||||
chosenNodes.add(storages[2]);
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[0], null,
|
||||
chosenNodes, excessTypes));
|
||||
// the added node adds a new rack
|
||||
assertTrue(BlockPlacementPolicyDefault.useDelHint(true, storages[3],
|
||||
storages[5], chosenNodes, excessTypes));
|
||||
// removing delHint reduces the number of racks;
|
||||
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3],
|
||||
storages[0], chosenNodes, excessTypes));
|
||||
assertFalse(BlockPlacementPolicyDefault.useDelHint(true, storages[3], null,
|
||||
chosenNodes, excessTypes));
|
||||
}
|
||||
|
||||
/**
|
||||
* This testcase tests whether the default value returned by
|
||||
* DFSUtil.getInvalidateWorkPctPerIteration() is positive,
|
||||
|
|
|
@ -615,8 +615,8 @@ public class TestReplicationPolicyWithNodeGroup {
|
|||
assertEquals(1, second.size());
|
||||
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)3, first, second, excessTypes);
|
||||
DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
|
||||
.chooseReplicaToDelete((short) 3, first, second, excessTypes);
|
||||
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
|
||||
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
|
||||
// but dataNodes[1] is chosen as less free space
|
||||
|
@ -628,8 +628,8 @@ public class TestReplicationPolicyWithNodeGroup {
|
|||
// Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
|
||||
// as less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)2, first, second, excessTypes);
|
||||
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||
(short) 2, first, second, excessTypes);
|
||||
assertEquals(chosen, storages[2]);
|
||||
|
||||
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
|
||||
|
@ -637,8 +637,8 @@ public class TestReplicationPolicyWithNodeGroup {
|
|||
assertEquals(2, second.size());
|
||||
// Within second set, dataNodes[5] with less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)1, first, second, excessTypes);
|
||||
chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
|
||||
(short) 1, first, second, excessTypes);
|
||||
assertEquals(chosen, storages[5]);
|
||||
}
|
||||
|
||||
|
|
|
@ -40,10 +40,8 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
|
@ -631,10 +629,8 @@ public class TestDNFencing {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection inode,
|
||||
Block block, short replicationFactor,
|
||||
Collection<DatanodeStorageInfo> first,
|
||||
Collection<DatanodeStorageInfo> second,
|
||||
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor,
|
||||
Collection<DatanodeStorageInfo> first, Collection<DatanodeStorageInfo> second,
|
||||
List<StorageType> excessTypes) {
|
||||
|
||||
Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
|
||||
|
|
Loading…
Reference in New Issue