HDFS-6710. Change BlockPlacementPolicy to consider block storage policy in replica deletion.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1612265 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2b07af0c59
commit
014be2510f
|
@ -11,6 +11,9 @@ HDFS-6584: Archival Storage
|
|||
HDFS-6671. Change BlockPlacementPolicy to consider block storage policy
|
||||
in replicaiton. (szetszwo)
|
||||
|
||||
HDFS-6710. Change BlockPlacementPolicy to consider block storage policy
|
||||
in replica deletion. (szetszwo)
|
||||
|
||||
Trunk (Unreleased)
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -118,17 +118,42 @@ public class BlockStoragePolicy {
|
|||
public List<StorageType> chooseStorageTypes(final short replication,
|
||||
final Iterable<StorageType> chosen) {
|
||||
final List<StorageType> types = chooseStorageTypes(replication);
|
||||
|
||||
//remove the chosen storage types
|
||||
for(StorageType c : chosen) {
|
||||
final int i = types.indexOf(c);
|
||||
if (i >= 0) {
|
||||
types.remove(i);
|
||||
}
|
||||
}
|
||||
diff(types, chosen, null);
|
||||
return types;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the list difference t = t - c.
|
||||
* Further, if e is not null, set e = e + c - t;
|
||||
*/
|
||||
private static void diff(List<StorageType> t, Iterable<StorageType> c,
|
||||
List<StorageType> e) {
|
||||
for(StorageType storagetype : c) {
|
||||
final int i = t.indexOf(storagetype);
|
||||
if (i >= 0) {
|
||||
t.remove(i);
|
||||
} else if (e != null) {
|
||||
e.add(storagetype);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Choose excess storage types for deletion, given the
|
||||
* replication number and the storage types of the chosen replicas.
|
||||
*
|
||||
* @param replication the replication number.
|
||||
* @param chosen the storage types of the chosen replicas.
|
||||
* @return a list of {@link StorageType}s for deletion.
|
||||
*/
|
||||
public List<StorageType> chooseExcess(final short replication,
|
||||
final Iterable<StorageType> chosen) {
|
||||
final List<StorageType> types = chooseStorageTypes(replication);
|
||||
final List<StorageType> excess = new LinkedList<StorageType>();
|
||||
diff(types, chosen, excess);
|
||||
return excess;
|
||||
}
|
||||
|
||||
/** @return the fallback {@link StorageType} for creation. */
|
||||
public StorageType getCreationFallback(EnumSet<StorageType> unavailables) {
|
||||
return getFallback(unavailables, creationFallbacks);
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
|
||||
|
@ -2712,6 +2713,10 @@ public class BlockManager {
|
|||
assert namesystem.hasWriteLock();
|
||||
// first form a rack to datanodes map and
|
||||
BlockCollection bc = getBlockCollection(b);
|
||||
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
|
||||
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
|
||||
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
|
||||
|
||||
|
||||
final Map<String, List<DatanodeStorageInfo>> rackMap
|
||||
= new HashMap<String, List<DatanodeStorageInfo>>();
|
||||
|
@ -2741,7 +2746,7 @@ public class BlockManager {
|
|||
cur = delNodeHintStorage;
|
||||
} else { // regular excessive replica removal
|
||||
cur = replicator.chooseReplicaToDelete(bc, b, replication,
|
||||
moreThanOne, exactlyOne);
|
||||
moreThanOne, exactlyOne, excessTypes);
|
||||
}
|
||||
firstOne = false;
|
||||
|
||||
|
|
|
@ -119,18 +119,21 @@ public abstract class BlockPlacementPolicy {
|
|||
* @param srcBC block collection of file to which block-to-be-deleted belongs
|
||||
* @param block The block to be deleted
|
||||
* @param replicationFactor The required number of replicas for this block
|
||||
* @param existingReplicas The replica locations of this block that are present
|
||||
on at least two unique racks.
|
||||
* @param moreExistingReplicas Replica locations of this block that are not
|
||||
listed in the previous parameter.
|
||||
* @param moreThanOne The replica locations of this block that are present
|
||||
* on more than one unique racks.
|
||||
* @param exactlyOne Replica locations of this block that are present
|
||||
* on exactly one unique racks.
|
||||
* @param excessTypes The excess {@link StorageType}s according to the
|
||||
* {@link BlockStoragePolicy}.
|
||||
* @return the replica that is the best candidate for deletion
|
||||
*/
|
||||
abstract public DatanodeStorageInfo chooseReplicaToDelete(
|
||||
BlockCollection srcBC,
|
||||
Block block,
|
||||
short replicationFactor,
|
||||
Collection<DatanodeStorageInfo> existingReplicas,
|
||||
Collection<DatanodeStorageInfo> moreExistingReplicas);
|
||||
Collection<DatanodeStorageInfo> moreThanOne,
|
||||
Collection<DatanodeStorageInfo> exactlyOne,
|
||||
List<StorageType> excessTypes);
|
||||
|
||||
/**
|
||||
* Used to setup a BlockPlacementPolicy object. This should be defined by
|
||||
|
|
|
@ -22,7 +22,6 @@ import static org.apache.hadoop.util.Time.now;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
@ -273,8 +272,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
// Keep a copy of original excludedNodes
|
||||
final Set<Node> oldExcludedNodes = avoidStaleNodes ?
|
||||
new HashSet<Node>(excludedNodes) : null;
|
||||
final List<StorageType> storageTypes = chooseStorageTypes(storagePolicy,
|
||||
(short)totalReplicasExpected, results);
|
||||
final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes(
|
||||
(short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results));
|
||||
try {
|
||||
if (numOfResults == 0) {
|
||||
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
|
||||
|
@ -672,28 +671,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
return true;
|
||||
}
|
||||
|
||||
private static List<StorageType> chooseStorageTypes(
|
||||
final BlockStoragePolicy storagePolicy, final short replication,
|
||||
final Iterable<DatanodeStorageInfo> chosen) {
|
||||
return storagePolicy.chooseStorageTypes(
|
||||
replication, new Iterable<StorageType>() {
|
||||
@Override
|
||||
public Iterator<StorageType> iterator() {
|
||||
return new Iterator<StorageType>() {
|
||||
final Iterator<DatanodeStorageInfo> i = chosen.iterator();
|
||||
@Override
|
||||
public boolean hasNext() {return i.hasNext();}
|
||||
@Override
|
||||
public StorageType next() {return i.next().getStorageType();}
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a pipeline of nodes.
|
||||
* The pipeline is formed finding a shortest path that
|
||||
|
@ -759,7 +736,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
|
||||
Block block, short replicationFactor,
|
||||
Collection<DatanodeStorageInfo> first,
|
||||
Collection<DatanodeStorageInfo> second) {
|
||||
Collection<DatanodeStorageInfo> second,
|
||||
final List<StorageType> excessTypes) {
|
||||
long oldestHeartbeat =
|
||||
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
||||
DatanodeStorageInfo oldestHeartbeatStorage = null;
|
||||
|
@ -769,6 +747,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
// Pick the node with the oldest heartbeat or with the least free space,
|
||||
// if all hearbeats are within the tolerable heartbeat interval
|
||||
for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
|
||||
if (!excessTypes.contains(storage.getStorageType())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
||||
long free = node.getRemaining();
|
||||
long lastHeartbeat = node.getLastUpdate();
|
||||
|
@ -781,9 +763,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
minSpaceStorage = storage;
|
||||
}
|
||||
}
|
||||
|
||||
return oldestHeartbeatStorage != null? oldestHeartbeatStorage
|
||||
: minSpaceStorage;
|
||||
final DatanodeStorageInfo storage = oldestHeartbeatStorage != null?
|
||||
oldestHeartbeatStorage : minSpaceStorage;
|
||||
excessTypes.remove(storage.getStorageType());
|
||||
return storage;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -292,6 +292,26 @@ public class DatanodeStorageInfo {
|
|||
return "[" + storageType + "]" + storageID + ":" + state;
|
||||
}
|
||||
|
||||
static Iterable<StorageType> toStorageTypes(
|
||||
final Iterable<DatanodeStorageInfo> infos) {
|
||||
return new Iterable<StorageType>() {
|
||||
@Override
|
||||
public Iterator<StorageType> iterator() {
|
||||
return new Iterator<StorageType>() {
|
||||
final Iterator<DatanodeStorageInfo> i = infos.iterator();
|
||||
@Override
|
||||
public boolean hasNext() {return i.hasNext();}
|
||||
@Override
|
||||
public StorageType next() {return i.next().getStorageType();}
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/** @return the first {@link DatanodeStorageInfo} corresponding to
|
||||
* the given datanode
|
||||
*/
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.LogVerificationAppender;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
|
@ -933,8 +934,10 @@ public class TestReplicationPolicy {
|
|||
// replica nodes, while storages[2] and dataNodes[5] are in second set.
|
||||
assertEquals(2, first.size());
|
||||
assertEquals(2, second.size());
|
||||
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)3, first, second);
|
||||
null, null, (short)3, first, second, excessTypes);
|
||||
// Within first set, storages[1] with less free space
|
||||
assertEquals(chosen, storages[1]);
|
||||
|
||||
|
@ -942,8 +945,9 @@ public class TestReplicationPolicy {
|
|||
assertEquals(0, first.size());
|
||||
assertEquals(3, second.size());
|
||||
// Within second set, storages[5] with less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)2, first, second);
|
||||
null, null, (short)2, first, second, excessTypes);
|
||||
assertEquals(chosen, storages[5]);
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
|
@ -612,8 +613,10 @@ public class TestReplicationPolicyWithNodeGroup {
|
|||
replicaList, rackMap, first, second);
|
||||
assertEquals(3, first.size());
|
||||
assertEquals(1, second.size());
|
||||
List<StorageType> excessTypes = new ArrayList<StorageType>();
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)3, first, second);
|
||||
null, null, (short)3, first, second, excessTypes);
|
||||
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
|
||||
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
|
||||
// but dataNodes[1] is chosen as less free space
|
||||
|
@ -624,16 +627,18 @@ public class TestReplicationPolicyWithNodeGroup {
|
|||
assertEquals(1, second.size());
|
||||
// Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
|
||||
// as less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)2, first, second);
|
||||
null, null, (short)2, first, second, excessTypes);
|
||||
assertEquals(chosen, storages[2]);
|
||||
|
||||
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
|
||||
assertEquals(0, first.size());
|
||||
assertEquals(2, second.size());
|
||||
// Within second set, dataNodes[5] with less free space
|
||||
excessTypes.add(StorageType.DEFAULT);
|
||||
chosen = replicator.chooseReplicaToDelete(
|
||||
null, null, (short)1, first, second);
|
||||
null, null, (short)1, first, second, excessTypes);
|
||||
assertEquals(chosen, storages[5]);
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
|
@ -588,7 +589,8 @@ public class TestDNFencing {
|
|||
public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection inode,
|
||||
Block block, short replicationFactor,
|
||||
Collection<DatanodeStorageInfo> first,
|
||||
Collection<DatanodeStorageInfo> second) {
|
||||
Collection<DatanodeStorageInfo> second,
|
||||
List<StorageType> excessTypes) {
|
||||
|
||||
Collection<DatanodeStorageInfo> chooseFrom = !first.isEmpty() ? first : second;
|
||||
|
||||
|
|
Loading…
Reference in New Issue