HDFS-4990. Change BlockPlacementPolicy to choose storages instead of datanodes.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1524444 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-09-18 15:12:52 +00:00
parent 0398943572
commit abf09f090f
32 changed files with 926 additions and 676 deletions

View File

@ -23,3 +23,6 @@ IMPROVEMENTS:
fix a synchronization problem in DatanodeStorageInfo. (szetszwo)
HDFS-5157. Add StorageType to FsVolume. (Junping Du via szetszwo)
HDFS-4990. Change BlockPlacementPolicy to choose storages instead of
datanodes. (szetszwo)

View File

@ -913,7 +913,8 @@ public class DFSOutputStream extends FSOutputSummer
//get a new datanode
final DatanodeInfo[] original = nodes;
final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
src, block, nodes, failed.toArray(new DatanodeInfo[failed.size()]),
src, block, nodes, storageIDs,
failed.toArray(new DatanodeInfo[failed.size()]),
1, dfsClient.clientName);
nodes = lb.getLocations();

View File

@ -131,6 +131,23 @@ public class DFSUtil {
return SECURE_RANDOM.get();
}
/** Shuffle the elements in the given array. */
public static <T> T[] shuffle(final T[] array) {
if (array != null && array.length > 0) {
final Random random = getRandom();
for (int n = array.length; n > 1; ) {
final int randomIndex = random.nextInt(n);
n--;
if (n != randomIndex) {
final T tmp = array[randomIndex];
array[randomIndex] = array[n];
array[n] = tmp;
}
}
}
return array;
}
/**
* Compartor for sorting DataNodeInfo[] based on decommissioned states.
* Decommissioned nodes are moved to the end of the array on sorting with

View File

@ -353,7 +353,8 @@ public interface ClientProtocol {
*/
@Idempotent
public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
final DatanodeInfo[] existings, final String[] existingStorageIDs,
final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName
) throws AccessControlException, FileNotFoundException,
SafeModeException, UnresolvedLinkException, IOException;

View File

@ -59,18 +59,18 @@ public class LocatedBlock {
this(b, locs, null, null, startOffset, corrupt);
}
public static LocatedBlock createLocatedBlock(ExtendedBlock b,
DatanodeStorageInfo[] storages, long startOffset, boolean corrupt) {
final DatanodeInfo[] locs = new DatanodeInfo[storages.length];
final String[] storageIDs = new String[storages.length];
final StorageType[] storageType = new StorageType[storages.length];
for(int i = 0; i < storages.length; i++) {
locs[i] = storages[i].getDatanodeDescriptor();
storageIDs[i] = storages[i].getStorageID();
storageType[i] = storages[i].getStorageType();
}
return new LocatedBlock(b, locs, storageIDs, storageType, startOffset, corrupt);
public LocatedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages) {
this(b, storages, -1, false); // startOffset is unknown
}
public LocatedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages,
long startOffset, boolean corrupt) {
this(b, DatanodeStorageInfo.toDatanodeInfos(storages),
DatanodeStorageInfo.toStorageIDs(storages),
DatanodeStorageInfo.toStorageTypes(storages),
startOffset, corrupt); // startOffset is unknown
}
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs,
StorageType[] storageTypes, long startOffset,
boolean corrupt) {

View File

@ -405,14 +405,17 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throws ServiceException {
try {
List<DatanodeInfoProto> existingList = req.getExistingsList();
List<String> existingStorageIDsList = req.getExistingStorageIDsList();
List<DatanodeInfoProto> excludesList = req.getExcludesList();
LocatedBlock result = server.getAdditionalDatanode(
req.getSrc(), PBHelper.convert(req.getBlk()),
LocatedBlock result = server.getAdditionalDatanode(req.getSrc(),
PBHelper.convert(req.getBlk()),
PBHelper.convert(existingList.toArray(
new DatanodeInfoProto[existingList.size()])),
existingStorageIDsList.toArray(
new String[existingStorageIDsList.size()]),
PBHelper.convert(excludesList.toArray(
new DatanodeInfoProto[excludesList.size()])),
req.getNumAdditionalNodes(), req.getClientName());
req.getNumAdditionalNodes(), req.getClientName());
return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
PBHelper.convert(result))
.build();

View File

@ -335,7 +335,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override
public LocatedBlock getAdditionalDatanode(String src, ExtendedBlock blk,
DatanodeInfo[] existings, DatanodeInfo[] excludes,
DatanodeInfo[] existings, String[] existingStorageIDs,
DatanodeInfo[] excludes,
int numAdditionalNodes, String clientName) throws AccessControlException,
FileNotFoundException, SafeModeException, UnresolvedLinkException,
IOException {
@ -344,6 +345,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setSrc(src)
.setBlk(PBHelper.convert(blk))
.addAllExistings(PBHelper.convert(existings))
.addAllExistingStorageIDs(Arrays.asList(existingStorageIDs))
.addAllExcludes(PBHelper.convert(excludes))
.setNumAdditionalNodes(numAdditionalNodes)
.setClientName(clientName)

View File

@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto.File
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageIDsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
@ -744,7 +745,8 @@ public class PBHelper {
for (int i = 0; i < blocks.length; i++) {
builder.addBlocks(PBHelper.convert(blocks[i]));
}
builder.addAllTargets(PBHelper.convert(cmd.getTargets()));
builder.addAllTargets(convert(cmd.getTargets()))
.addAllTargetStorageIDs(convert(cmd.getTargetStorageIDs()));
return builder.build();
}
@ -757,6 +759,15 @@ public class PBHelper {
return Arrays.asList(ret);
}
private static List<StorageIDsProto> convert(String[][] targetStorageIDs) {
StorageIDsProto[] ret = new StorageIDsProto[targetStorageIDs.length];
for (int i = 0; i < targetStorageIDs.length; i++) {
ret[i] = StorageIDsProto.newBuilder()
.addAllStorageIDs(Arrays.asList(targetStorageIDs[i])).build();
}
return Arrays.asList(ret);
}
public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder();
if (datanodeCommand == null) {
@ -831,6 +842,14 @@ public class PBHelper {
for (int i = 0; i < targetList.size(); i++) {
targets[i] = PBHelper.convert(targetList.get(i));
}
List<StorageIDsProto> targetStorageIDsList = blkCmd.getTargetStorageIDsList();
String[][] targetStorageIDs = new String[targetStorageIDsList.size()][];
for(int i = 0; i < targetStorageIDs.length; i++) {
List<String> storageIDs = targetStorageIDsList.get(i).getStorageIDsList();
targetStorageIDs[i] = storageIDs.toArray(new String[storageIDs.size()]);
}
int action = DatanodeProtocol.DNA_UNKNOWN;
switch (blkCmd.getAction()) {
case TRANSFER:
@ -843,7 +862,8 @@ public class PBHelper {
action = DatanodeProtocol.DNA_SHUTDOWN;
break;
}
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
targetStorageIDs);
}
public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {

View File

@ -835,16 +835,6 @@ public class Balancer {
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
}
/* Shuffle datanode array */
static private void shuffleArray(DatanodeInfo[] datanodes) {
for (int i=datanodes.length; i>1; i--) {
int randomIndex = DFSUtil.getRandom().nextInt(i);
DatanodeInfo tmp = datanodes[randomIndex];
datanodes[randomIndex] = datanodes[i-1];
datanodes[i-1] = tmp;
}
}
/* Given a data node set, build a network topology and decide
* over-utilized datanodes, above average utilized datanodes,
* below average utilized datanodes, and underutilized datanodes.
@ -874,8 +864,7 @@ public class Balancer {
* an increasing order or a decreasing order.
*/
long overLoadedBytes = 0L, underLoadedBytes = 0L;
shuffleArray(datanodes);
for (DatanodeInfo datanode : datanodes) {
for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
continue; // ignore decommissioning or decommissioned nodes
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
@ -473,8 +474,8 @@ public class BlockManager {
private void dumpBlockMeta(Block block, PrintWriter out) {
List<DatanodeDescriptor> containingNodes =
new ArrayList<DatanodeDescriptor>();
List<DatanodeDescriptor> containingLiveReplicasNodes =
new ArrayList<DatanodeDescriptor>();
List<DatanodeStorageInfo> containingLiveReplicasNodes =
new ArrayList<DatanodeStorageInfo>();
NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used
@ -774,7 +775,7 @@ public class BlockManager {
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return LocatedBlock.createLocatedBlock(eb, storages, pos, false);
return new LocatedBlock(eb, storages, pos, false);
}
// get block locations
@ -789,14 +790,14 @@ public class BlockManager {
final int numNodes = blocksMap.numNodes(blk);
final boolean isCorrupt = numCorruptNodes == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
int j = 0;
if (numMachines > 0) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
if (isCorrupt || (!isCorrupt && !replicaCorrupt))
machines[j++] = d;
machines[j++] = storage;
}
}
assert j == machines.length :
@ -1195,7 +1196,7 @@ public class BlockManager {
@VisibleForTesting
int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode;
BlockCollection bc = null;
int additionalReplRequired;
@ -1220,7 +1221,7 @@ public class BlockManager {
// get a source data-node
containingNodes = new ArrayList<DatanodeDescriptor>();
liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
NumberReplicas numReplicas = new NumberReplicas();
srcNode = chooseSourceDatanode(
block, containingNodes, liveReplicaNodes, numReplicas,
@ -1279,7 +1280,7 @@ public class BlockManager {
namesystem.writeLock();
try {
for(ReplicationWork rw : work){
DatanodeDescriptor[] targets = rw.targets;
final DatanodeStorageInfo[] targets = rw.targets;
if(targets == null || targets.length == 0){
rw.targets = null;
continue;
@ -1317,7 +1318,8 @@ public class BlockManager {
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!blockHasEnoughRacks(block)) ) {
if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
if (rw.srcNode.getNetworkLocation().equals(
targets[0].getDatanodeDescriptor().getNetworkLocation())) {
//No use continuing, unless a new rack in this case
continue;
}
@ -1327,8 +1329,8 @@ public class BlockManager {
rw.srcNode.addBlockToBeReplicated(block, targets);
scheduledWork++;
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
for (DatanodeStorageInfo storage : targets) {
storage.getDatanodeDescriptor().incBlocksScheduled();
}
// Move the block-replication into a "pending" state.
@ -1354,7 +1356,7 @@ public class BlockManager {
if (blockLog.isInfoEnabled()) {
// log which blocks have been scheduled for replication
for(ReplicationWork rw : work){
DatanodeDescriptor[] targets = rw.targets;
DatanodeStorageInfo[] targets = rw.targets;
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
for (int k = 0; k < targets.length; k++) {
@ -1383,15 +1385,16 @@ public class BlockManager {
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
* List, boolean, Set, long)
*/
public DatanodeDescriptor[] chooseTarget(final String src,
public DatanodeStorageInfo[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client,
final Set<Node> excludedNodes,
final long blocksize, List<String> favoredNodes) throws IOException {
List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes);
final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
numOfReplicas, client, excludedNodes, blocksize,
favoredDatanodeDescriptors);
// TODO: get storage type from file
favoredDatanodeDescriptors, StorageType.DEFAULT);
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to "
+ targets.length + " nodes instead of minReplication (="
@ -1452,12 +1455,11 @@ public class BlockManager {
* the given block
*/
@VisibleForTesting
DatanodeDescriptor chooseSourceDatanode(
Block block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeDescriptor> nodesContainingLiveReplicas,
NumberReplicas numReplicas,
int priority) {
DatanodeDescriptor chooseSourceDatanode(Block block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
NumberReplicas numReplicas,
int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
DatanodeDescriptor srcNode = null;
@ -1478,7 +1480,7 @@ public class BlockManager {
else if (excessBlocks != null && excessBlocks.contains(block)) {
excess++;
} else {
nodesContainingLiveReplicas.add(node);
nodesContainingLiveReplicas.add(storage);
live++;
}
containingNodes.add(node);
@ -1621,7 +1623,8 @@ public class BlockManager {
// To minimize startup time, we discard any second (or later) block reports
// that we receive while still in startup phase.
final DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
final DatanodeStorageInfo storageInfo = node.updateStorage(storage);
LOG.info("XXX storageInfo=" + storageInfo + ", storage=" + storage);
if (namesystem.isInStartupSafeMode()
&& storageInfo.getBlockReportCount() > 0) {
blockLog.info("BLOCK* processReport: "
@ -2636,7 +2639,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
//
// Modify the blocks->datanode map and node's map.
//
pendingReplications.decrement(block, node);
pendingReplications.decrement(block, node, storageID);
processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
delHintNode);
}
@ -3225,24 +3228,24 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
private DatanodeDescriptor srcNode;
private List<DatanodeDescriptor> containingNodes;
private List<DatanodeDescriptor> liveReplicaNodes;
private List<DatanodeStorageInfo> liveReplicaStorages;
private int additionalReplRequired;
private DatanodeDescriptor targets[];
private DatanodeStorageInfo targets[];
private int priority;
public ReplicationWork(Block block,
BlockCollection bc,
DatanodeDescriptor srcNode,
List<DatanodeDescriptor> containingNodes,
List<DatanodeDescriptor> liveReplicaNodes,
List<DatanodeStorageInfo> liveReplicaStorages,
int additionalReplRequired,
int priority) {
this.block = block;
this.bc = bc;
this.srcNode = srcNode;
this.containingNodes = containingNodes;
this.liveReplicaNodes = liveReplicaNodes;
this.liveReplicaStorages = liveReplicaStorages;
this.additionalReplRequired = additionalReplRequired;
this.priority = priority;
this.targets = null;
@ -3251,8 +3254,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
private void chooseTargets(BlockPlacementPolicy blockplacement,
Set<Node> excludedNodes) {
targets = blockplacement.chooseTarget(bc.getName(),
additionalReplRequired, srcNode, liveReplicaNodes, false,
excludedNodes, block.getNumBytes());
additionalReplRequired, srcNode, liveReplicaStorages, false,
excludedNodes, block.getNumBytes(), StorageType.DEFAULT);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -67,13 +68,14 @@ public abstract class BlockPlacementPolicy {
* @return array of DatanodeDescriptor instances chosen as target
* and sorted as a pipeline.
*/
public abstract DatanodeDescriptor[] chooseTarget(String srcPath,
public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
List<DatanodeDescriptor> chosenNodes,
List<DatanodeStorageInfo> chosen,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize);
long blocksize,
StorageType storageType);
/**
* Same as {@link #chooseTarget(String, int, Node, List, boolean,
@ -82,16 +84,19 @@ public abstract class BlockPlacementPolicy {
* is only a hint and due to cluster state, namenode may not be
* able to place the blocks on these datanodes.
*/
DatanodeDescriptor[] chooseTarget(String src,
DatanodeStorageInfo[] chooseTarget(String src,
int numOfReplicas, Node writer,
Set<Node> excludedNodes,
long blocksize, List<DatanodeDescriptor> favoredNodes) {
long blocksize,
List<DatanodeDescriptor> favoredNodes,
StorageType storageType) {
// This class does not provide the functionality of placing
// a block in favored datanodes. The implementations of this class
// are expected to provide this functionality
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeDescriptor>(numOfReplicas), false, excludedNodes,
blocksize);
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
excludedNodes, blocksize, storageType);
}
/**

View File

@ -29,11 +29,14 @@ import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
@ -103,99 +106,101 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
@Override
public DatanodeDescriptor[] chooseTarget(String srcPath,
public DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
List<DatanodeDescriptor> chosenNodes,
List<DatanodeStorageInfo> chosenNodes,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize) {
long blocksize,
StorageType storageType) {
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes, blocksize);
excludedNodes, blocksize, storageType);
}
@Override
DatanodeDescriptor[] chooseTarget(String src,
DatanodeStorageInfo[] chooseTarget(String src,
int numOfReplicas,
Node writer,
Set<Node> excludedNodes,
long blocksize,
List<DatanodeDescriptor> favoredNodes) {
List<DatanodeDescriptor> favoredNodes,
StorageType storageType) {
try {
if (favoredNodes == null || favoredNodes.size() == 0) {
// Favored nodes not specified, fall back to regular block placement.
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeDescriptor>(numOfReplicas), false,
excludedNodes, blocksize);
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
excludedNodes, blocksize, storageType);
}
Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
new HashSet<Node>() : new HashSet<Node>(excludedNodes);
// Choose favored nodes
List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
boolean avoidStaleNodes = stats != null
&& stats.isAvoidingStaleDataNodesForWrite();
for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
DatanodeDescriptor favoredNode = favoredNodes.get(i);
// Choose a single node which is local to favoredNode.
// 'results' is updated within chooseLocalNode
DatanodeDescriptor target = chooseLocalNode(favoredNode,
final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
favoriteAndExcludedNodes, blocksize,
getMaxNodesPerRack(results,
numOfReplicas)[1], results, avoidStaleNodes);
getMaxNodesPerRack(results.size(), numOfReplicas)[1],
results, avoidStaleNodes, storageType);
if (target == null) {
LOG.warn("Could not find a target for file " + src
+ " with favored node " + favoredNode);
continue;
}
favoriteAndExcludedNodes.add(target);
favoriteAndExcludedNodes.add(target.getDatanodeDescriptor());
}
if (results.size() < numOfReplicas) {
// Not enough favored nodes, choose other nodes.
numOfReplicas -= results.size();
DatanodeDescriptor[] remainingTargets =
DatanodeStorageInfo[] remainingTargets =
chooseTarget(src, numOfReplicas, writer, results,
false, favoriteAndExcludedNodes, blocksize);
false, favoriteAndExcludedNodes, blocksize, storageType);
for (int i = 0; i < remainingTargets.length; i++) {
results.add(remainingTargets[i]);
}
}
return getPipeline(writer,
results.toArray(new DatanodeDescriptor[results.size()]));
results.toArray(new DatanodeStorageInfo[results.size()]));
} catch (NotEnoughReplicasException nr) {
// Fall back to regular block placement disregarding favored nodes hint
return chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeDescriptor>(numOfReplicas), false,
excludedNodes, blocksize);
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
excludedNodes, blocksize, storageType);
}
}
/** This is the implementation. */
private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
Node writer,
List<DatanodeDescriptor> chosenNodes,
List<DatanodeStorageInfo> chosenStorage,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize) {
long blocksize,
StorageType storageType) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return DatanodeDescriptor.EMPTY_ARRAY;
return DatanodeStorageInfo.EMPTY_ARRAY;
}
if (excludedNodes == null) {
excludedNodes = new HashSet<Node>();
}
int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
List<DatanodeDescriptor> results =
new ArrayList<DatanodeDescriptor>(chosenNodes);
for (DatanodeDescriptor node:chosenNodes) {
final List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(chosenStorage);
for (DatanodeStorageInfo storage : chosenStorage) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(node, excludedNodes);
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
}
if (!clusterMap.contains(writer)) {
@ -205,20 +210,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
Node localNode = chooseTarget(numOfReplicas, writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
if (!returnChosenNodes) {
results.removeAll(chosenNodes);
results.removeAll(chosenStorage);
}
// sorting nodes to form a pipeline
return getPipeline((writer==null)?localNode:writer,
results.toArray(new DatanodeDescriptor[results.size()]));
results.toArray(new DatanodeStorageInfo[results.size()]));
}
private int[] getMaxNodesPerRack(List<DatanodeDescriptor> chosenNodes,
int numOfReplicas) {
private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
int clusterSize = clusterMap.getNumOfLeaves();
int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
int totalNumOfReplicas = numOfChosen + numOfReplicas;
if (totalNumOfReplicas > clusterSize) {
numOfReplicas -= (totalNumOfReplicas-clusterSize);
totalNumOfReplicas = clusterSize;
@ -243,8 +247,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
final boolean avoidStaleNodes) {
List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
StorageType storageType) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return writer;
}
@ -253,7 +258,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
int numOfResults = results.size();
boolean newBlock = (numOfResults==0);
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
writer = results.get(0);
writer = results.get(0).getDatanodeDescriptor();
}
// Keep a copy of original excludedNodes
@ -261,42 +266,49 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
new HashSet<Node>(excludedNodes) : null;
try {
if (numOfResults == 0) {
writer = chooseLocalNode(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType)
.getDatanodeDescriptor();
if (--numOfReplicas == 0) {
return writer;
}
}
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageType);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
chooseRemoteRack(1, results.get(0), excludedNodes,
blocksize, maxNodesPerRack,
results, avoidStaleNodes);
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
if (clusterMap.isOnSameRack(dn0, dn1)) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageType);
} else if (newBlock){
chooseLocalRack(results.get(1), excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageType);
} else {
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes);
results, avoidStaleNodes, storageType);
}
if (--numOfReplicas == 0) {
return writer;
}
}
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
maxNodesPerRack, results, avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e) {
LOG.warn("Not able to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach "
+ totalReplicasExpected + "\n"
+ e.getMessage());
final String message = "Failed to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach "
+ totalReplicasExpected + ".";
if (LOG.isTraceEnabled()) {
LOG.trace(message, e);
} else {
LOG.warn(message + " " + e.getMessage());
}
if (avoidStaleNodes) {
// Retry chooseTarget again, this time not avoiding stale nodes.
@ -304,14 +316,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// not chosen because they were stale, decommissioned, etc.
// We need to additionally exclude the nodes that were added to the
// result list in the successful calls to choose*() above.
for (Node node : results) {
oldExcludedNodes.add(node);
for (DatanodeStorageInfo resultStorage : results) {
oldExcludedNodes.add(resultStorage.getDatanodeDescriptor());
}
// Set numOfReplicas, since it can get out of sync with the result list
// if the NotEnoughReplicasException was thrown in chooseRandom().
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false);
maxNodesPerRack, results, false, storageType);
}
}
return writer;
@ -321,32 +333,36 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* Choose <i>localMachine</i> as the target.
* if <i>localMachine</i> is not available,
* choose a node on the same rack
* @return the chosen node
* @return the chosen storage
*/
protected DatanodeDescriptor chooseLocalNode(Node localMachine,
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
maxNodesPerRack, results, avoidStaleNodes, storageType);
if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
// otherwise try local machine first
if (excludedNodes.add(localMachine)) { // was not in the excluded list
if (addIfIsGoodTarget(localDatanode, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
return localDatanode;
for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
localDatanode.getStorageInfos())) {
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) {
return localStorage;
}
}
}
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
maxNodesPerRack, results, avoidStaleNodes, storageType);
}
/**
@ -368,27 +384,29 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* in the cluster.
* @return the chosen node
*/
protected DatanodeDescriptor chooseLocalRack(Node localMachine,
protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
maxNodesPerRack, results, avoidStaleNodes, storageType);
}
// choose one from the local rack
try {
return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
for(DatanodeDescriptor nextNode : results) {
for(DatanodeStorageInfo resultStorage : results) {
DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor();
if (nextNode != localMachine) {
newLocal = nextNode;
break;
@ -397,16 +415,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (newLocal != null) {
try {
return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
maxNodesPerRack, results, avoidStaleNodes, storageType);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
maxNodesPerRack, results, avoidStaleNodes, storageType);
}
}
}
@ -423,48 +441,51 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
Set<Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
excludedNodes, blocksize, maxReplicasPerRack, results,
avoidStaleNodes);
avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e) {
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
maxReplicasPerRack, results, avoidStaleNodes);
maxReplicasPerRack, results, avoidStaleNodes, storageType);
}
}
/**
* Randomly choose one target from the given <i>scope</i>.
* @return the chosen node, if there is any.
* @return the chosen storage, if there is any.
*/
protected DatanodeDescriptor chooseRandom(String scope,
protected DatanodeStorageInfo chooseRandom(String scope,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType)
throws NotEnoughReplicasException {
return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes);
results, avoidStaleNodes, storageType);
}
/**
* Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
* @return the first chosen node, if there is any.
*/
protected DatanodeDescriptor chooseRandom(int numOfReplicas,
protected DatanodeStorageInfo chooseRandom(int numOfReplicas,
String scope,
Set<Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType)
throws NotEnoughReplicasException {
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
@ -475,24 +496,28 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
builder.setLength(0);
builder.append("[");
}
boolean badTarget = false;
DatanodeDescriptor firstChosen = null;
boolean goodTarget = false;
DatanodeStorageInfo firstChosen = null;
while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
DatanodeDescriptor chosenNode =
(DatanodeDescriptor)clusterMap.chooseRandom(scope);
if (excludedNodes.add(chosenNode)) { //was not in the excluded list
numOfAvailableNodes--;
int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes,
blocksize, maxNodesPerRack, considerLoad, results, avoidStaleNodes);
if (newExcludedNodes >= 0) {
numOfReplicas--;
if (firstChosen == null) {
firstChosen = chosenNode;
final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
chosenNode.getStorageInfos());
for(int i = 0; i < storages.length && !goodTarget; i++) {
final int newExcludedNodes = addIfIsGoodTarget(storages[i],
excludedNodes, blocksize, maxNodesPerRack, considerLoad, results,
avoidStaleNodes, storageType);
goodTarget = newExcludedNodes >= 0;
if (goodTarget) {
numOfReplicas--;
if (firstChosen == null) {
firstChosen = storages[i];
}
numOfAvailableNodes -= newExcludedNodes;
}
numOfAvailableNodes -= newExcludedNodes;
} else {
badTarget = true;
}
}
}
@ -500,7 +525,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (numOfReplicas>0) {
String detail = enableDebugLogging;
if (LOG.isDebugEnabled()) {
if (badTarget && builder != null) {
if (!goodTarget && builder != null) {
detail = builder.append("]").toString();
builder.setLength(0);
} else detail = "";
@ -512,43 +537,46 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
/**
* If the given node is a good target, add it to the result list and
* If the given storage is a good target, add it to the result list and
* update the set of excluded nodes.
* @return -1 if the given is not a good target;
* otherwise, return the number of nodes added to excludedNodes set.
*/
int addIfIsGoodTarget(DatanodeDescriptor node,
int addIfIsGoodTarget(DatanodeStorageInfo storage,
Set<Node> excludedNodes,
long blockSize,
int maxNodesPerRack,
boolean considerLoad,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes) {
if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad,
results, avoidStaleNodes)) {
results.add(node);
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType) {
if (isGoodTarget(storage, blockSize, maxNodesPerRack, considerLoad,
results, avoidStaleNodes, storageType)) {
results.add(storage);
// add node and related nodes to excludedNode
return addToExcludedNodes(node, excludedNodes);
return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
} else {
return -1;
}
}
private static void logNodeIsNotChosen(DatanodeDescriptor node, String reason) {
private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String reason) {
if (LOG.isDebugEnabled()) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
// build the error message for later use.
debugLoggingBuilder.get()
.append(node).append(": ")
.append("Node ").append(NodeBase.getPath(node))
.append("Storage ").append(storage)
.append("at node ").append(NodeBase.getPath(node))
.append(" is not chosen because ")
.append(reason);
}
}
/**
* Determine if a node is a good target.
* Determine if a storage is a good target.
*
* @param node The target node
* @param storage The target storage
* @param blockSize Size of block
* @param maxTargetPerRack Maximum number of targets per rack. The value of
* this parameter depends on the number of racks in
@ -561,29 +589,47 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* does not have too much load,
* and the rack does not have too many nodes.
*/
private boolean isGoodTarget(DatanodeDescriptor node,
private boolean isGoodTarget(DatanodeStorageInfo storage,
long blockSize, int maxTargetPerRack,
boolean considerLoad,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes) {
// check if the node is (being) decommissed
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType) {
if (storage.getStorageType() != storageType) {
logNodeIsNotChosen(storage,
"storage types do not match, where the expected storage type is "
+ storageType);
return false;
}
if (storage.getState() == State.READ_ONLY) {
logNodeIsNotChosen(storage, "storage is read-only");
return false;
}
DatanodeDescriptor node = storage.getDatanodeDescriptor();
// check if the node is (being) decommissioned
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
logNodeIsNotChosen(node, "the node is (being) decommissioned ");
logNodeIsNotChosen(storage, "the node is (being) decommissioned ");
return false;
}
if (avoidStaleNodes) {
if (node.isStale(this.staleInterval)) {
logNodeIsNotChosen(node, "the node is stale ");
logNodeIsNotChosen(storage, "the node is stale ");
return false;
}
}
final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
if (requiredSize > storage.getRemaining()) {
logNodeIsNotChosen(storage, "the storage does not have enough space ");
return false;
}
//TODO: move getBlocksScheduled() to DatanodeStorageInfo.
long remaining = node.getRemaining() -
(node.getBlocksScheduled() * blockSize);
// check the remaining capacity of the target machine
if (blockSize* HdfsConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
logNodeIsNotChosen(node, "the node does not have enough space ");
if (requiredSize > remaining) {
logNodeIsNotChosen(storage, "the node does not have enough space ");
return false;
}
@ -595,7 +641,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
avgLoad = (double)stats.getTotalLoad()/size;
}
if (node.getXceiverCount() > (2.0 * avgLoad)) {
logNodeIsNotChosen(node, "the node is too busy ");
logNodeIsNotChosen(storage, "the node is too busy ");
return false;
}
}
@ -603,13 +649,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// check if the target rack has chosen too many nodes
String rackname = node.getNetworkLocation();
int counter=1;
for(Node result : results) {
if (rackname.equals(result.getNetworkLocation())) {
for(DatanodeStorageInfo resultStorage : results) {
if (rackname.equals(
resultStorage.getDatanodeDescriptor().getNetworkLocation())) {
counter++;
}
}
if (counter>maxTargetPerRack) {
logNodeIsNotChosen(node, "the rack has too many chosen nodes ");
logNodeIsNotChosen(storage, "the rack has too many chosen nodes ");
return false;
}
return true;
@ -621,37 +668,40 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* starts from the writer and traverses all <i>nodes</i>
* This is basically a traveling salesman problem.
*/
private DatanodeDescriptor[] getPipeline(Node writer,
DatanodeDescriptor[] nodes) {
if (nodes.length==0) return nodes;
private DatanodeStorageInfo[] getPipeline(Node writer,
DatanodeStorageInfo[] storages) {
if (storages.length == 0) {
return storages;
}
synchronized(clusterMap) {
int index=0;
if (writer == null || !clusterMap.contains(writer)) {
writer = nodes[0];
writer = storages[0].getDatanodeDescriptor();
}
for(;index<nodes.length; index++) {
DatanodeDescriptor shortestNode = nodes[index];
int shortestDistance = clusterMap.getDistance(writer, shortestNode);
for(; index < storages.length; index++) {
DatanodeStorageInfo shortestStorage = storages[index];
int shortestDistance = clusterMap.getDistance(writer,
shortestStorage.getDatanodeDescriptor());
int shortestIndex = index;
for(int i=index+1; i<nodes.length; i++) {
DatanodeDescriptor currentNode = nodes[i];
int currentDistance = clusterMap.getDistance(writer, currentNode);
for(int i = index + 1; i < storages.length; i++) {
int currentDistance = clusterMap.getDistance(writer,
storages[i].getDatanodeDescriptor());
if (shortestDistance>currentDistance) {
shortestDistance = currentDistance;
shortestNode = currentNode;
shortestStorage = storages[i];
shortestIndex = i;
}
}
//switch position index & shortestIndex
if (index != shortestIndex) {
nodes[shortestIndex] = nodes[index];
nodes[index] = shortestNode;
storages[shortestIndex] = storages[index];
storages[index] = shortestStorage;
}
writer = shortestNode;
writer = shortestStorage.getDatanodeDescriptor();
}
}
return nodes;
return storages;
}
@Override

View File

@ -25,6 +25,8 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.net.NetworkTopology;
@ -64,81 +66,87 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* @return the chosen node
*/
@Override
protected DatanodeDescriptor chooseLocalNode(Node localMachine,
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
StorageType storageType) throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
// otherwise try local machine first
if (localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
// otherwise try local machine first
if (excludedNodes.add(localMachine)) { // was not in the excluded list
if (addIfIsGoodTarget(localDataNode, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
return localDataNode;
for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
localDataNode.getStorageInfos())) {
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) {
return localStorage;
}
}
}
}
// try a node on local node group
DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (chosenNode != null) {
return chosenNode;
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
if (chosenStorage != null) {
return chosenStorage;
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
}
/** @return the node of the second replica */
private static DatanodeDescriptor secondNode(Node localMachine,
List<DatanodeStorageInfo> results) {
// find the second replica
for(DatanodeStorageInfo nextStorage : results) {
DatanodeDescriptor nextNode = nextStorage.getDatanodeDescriptor();
if (nextNode != localMachine) {
return nextNode;
}
}
return null;
}
@Override
protected DatanodeDescriptor chooseLocalRack(Node localMachine,
protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
StorageType storageType) throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results,
avoidStaleNodes);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType);
}
// choose one from the local rack, but off-nodegroup
try {
return chooseRandom(NetworkTopology.getFirstHalf(
localMachine.getNetworkLocation()),
excludedNodes, blocksize,
maxNodesPerRack, results,
avoidStaleNodes);
final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation());
return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
for(DatanodeDescriptor nextNode : results) {
if (nextNode != localMachine) {
newLocal = nextNode;
break;
}
}
final DatanodeDescriptor newLocal = secondNode(localMachine, results);
if (newLocal != null) {
try {
return chooseRandom(
clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
maxNodesPerRack, results, avoidStaleNodes, storageType);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
maxNodesPerRack, results, avoidStaleNodes, storageType);
}
}
}
@ -146,8 +154,9 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
@Override
protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine, Set<Node> excludedNodes,
long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
boolean avoidStaleNodes) throws NotEnoughReplicasException {
long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results,
boolean avoidStaleNodes, StorageType storageType)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
final String rackLocation = NetworkTopology.getFirstHalf(
@ -155,12 +164,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
try {
// randomly choose from remote racks
chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize,
maxReplicasPerRack, results, avoidStaleNodes);
maxReplicasPerRack, results, avoidStaleNodes, storageType);
} catch (NotEnoughReplicasException e) {
// fall back to the local rack
chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
rackLocation, excludedNodes, blocksize,
maxReplicasPerRack, results, avoidStaleNodes);
maxReplicasPerRack, results, avoidStaleNodes, storageType);
}
}
@ -170,46 +179,40 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* if still no such node is available, choose a random node in the cluster.
* @return the chosen node
*/
private DatanodeDescriptor chooseLocalNodeGroup(
private DatanodeStorageInfo chooseLocalNodeGroup(
NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
StorageType storageType) throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType);
}
// choose one from the local node group
try {
return chooseRandom(
clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
storageType);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
for(DatanodeDescriptor nextNode : results) {
if (nextNode != localMachine) {
newLocal = nextNode;
break;
}
}
final DatanodeDescriptor newLocal = secondNode(localMachine, results);
if (newLocal != null) {
try {
return chooseRandom(
clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results,
avoidStaleNodes);
avoidStaleNodes, storageType);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
maxNodesPerRack, results, avoidStaleNodes, storageType);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
maxNodesPerRack, results, avoidStaleNodes, storageType);
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@ -54,9 +55,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
@InterfaceStability.Evolving
public static class BlockTargetPair {
public final Block block;
public final DatanodeDescriptor[] targets;
public final DatanodeStorageInfo[] targets;
BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
BlockTargetPair(Block block, DatanodeStorageInfo[] targets) {
this.block = block;
this.targets = targets;
}
@ -215,14 +216,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
return false;
}
public DatanodeStorageInfo getStorageInfo(String storageID) {
DatanodeStorageInfo getStorageInfo(String storageID) {
synchronized (storageMap) {
return storageMap.get(storageID);
}
}
public Collection<DatanodeStorageInfo> getStorageInfos() {
DatanodeStorageInfo[] getStorageInfos() {
synchronized (storageMap) {
return new ArrayList<DatanodeStorageInfo>(storageMap.values());
final Collection<DatanodeStorageInfo> storages = storageMap.values();
return storages.toArray(new DatanodeStorageInfo[storages.size()]);
}
}
@ -254,14 +256,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
return false;
}
/**
* Used for testing only
* @return the head of the blockList
*/
protected BlockInfo getHead(){
return getBlockIterator().next();
}
/**
* Replace specified old block with a new one in the DataNodeDescriptor.
*
@ -325,20 +319,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
private static class BlockIterator implements Iterator<BlockInfo> {
private final int maxIndex;
private int index = 0;
private List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>();
private final List<Iterator<BlockInfo>> iterators;
private BlockIterator(final Iterable<DatanodeStorageInfo> storages) {
private BlockIterator(final DatanodeStorageInfo... storages) {
List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>();
for (DatanodeStorageInfo e : storages) {
iterators.add(e.getBlockIterator());
}
maxIndex = iterators.size() - 1;
}
private BlockIterator(final DatanodeStorageInfo storage) {
iterators.add(storage.getBlockIterator());
maxIndex = iterators.size() - 1;
this.iterators = Collections.unmodifiableList(iterators);
}
@Override
@ -359,7 +348,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
private void update() {
while(index < maxIndex && !iterators.get(index).hasNext()) {
while(index < iterators.size() - 1 && !iterators.get(index).hasNext()) {
index++;
}
}
@ -375,7 +364,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
/**
* Store block replication work.
*/
void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
assert(block != null && targets != null && targets.length > 0);
replicateBlocks.offer(new BlockTargetPair(block, targets));
}

View File

@ -32,7 +32,9 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
* by this class.
*/
public class DatanodeStorageInfo {
static DatanodeInfo[] toDatanodeInfos(DatanodeStorageInfo[] storages) {
public static final DatanodeStorageInfo[] EMPTY_ARRAY = {};
public static DatanodeInfo[] toDatanodeInfos(DatanodeStorageInfo[] storages) {
return toDatanodeInfos(Arrays.asList(storages));
}
static DatanodeInfo[] toDatanodeInfos(List<DatanodeStorageInfo> storages) {
@ -43,6 +45,22 @@ public class DatanodeStorageInfo {
return datanodes;
}
public static String[] toStorageIDs(DatanodeStorageInfo[] storages) {
String[] storageIDs = new String[storages.length];
for(int i = 0; i < storageIDs.length; i++) {
storageIDs[i] = storages[i].getStorageID();
}
return storageIDs;
}
public static StorageType[] toStorageTypes(DatanodeStorageInfo[] storages) {
StorageType[] storageTypes = new StorageType[storages.length];
for(int i = 0; i < storageTypes.length; i++) {
storageTypes[i] = storages[i].getStorageType();
}
return storageTypes;
}
/**
* Iterates over the list of blocks belonging to the data-node.
*/
@ -207,6 +225,22 @@ public class DatanodeStorageInfo {
return dn;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
} else if (obj == null || !(obj instanceof DatanodeStorageInfo)) {
return false;
}
final DatanodeStorageInfo that = (DatanodeStorageInfo)obj;
return this.storageID.equals(that.storageID);
}
@Override
public int hashCode() {
return storageID.hashCode();
}
@Override
public String toString() {
return "[" + storageType + "]" + storageID + ":" + state;

View File

@ -76,7 +76,7 @@ class PendingReplicationBlocks {
* @param block The corresponding block
* @param targets The DataNodes where replicas of the block should be placed
*/
void increment(Block block, DatanodeDescriptor[] targets) {
void increment(Block block, DatanodeStorageInfo[] targets) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found == null) {
@ -95,14 +95,14 @@ class PendingReplicationBlocks {
*
* @param The DataNode that finishes the replication
*/
void decrement(Block block, DatanodeDescriptor dn) {
void decrement(Block block, DatanodeDescriptor dn, String storageID) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found != null) {
if(LOG.isDebugEnabled()) {
LOG.debug("Removing pending replication for " + block);
}
found.decrementReplicas(dn);
found.decrementReplicas(dn.getStorageInfo(storageID));
if (found.getNumReplicas() <= 0) {
pendingReplications.remove(block);
}
@ -174,12 +174,12 @@ class PendingReplicationBlocks {
*/
static class PendingBlockInfo {
private long timeStamp;
private final List<DatanodeDescriptor> targets;
private final List<DatanodeStorageInfo> targets;
PendingBlockInfo(DatanodeDescriptor[] targets) {
PendingBlockInfo(DatanodeStorageInfo[] targets) {
this.timeStamp = now();
this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
: new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));
this.targets = targets == null ? new ArrayList<DatanodeStorageInfo>()
: new ArrayList<DatanodeStorageInfo>(Arrays.asList(targets));
}
long getTimeStamp() {
@ -190,16 +190,16 @@ class PendingReplicationBlocks {
timeStamp = now();
}
void incrementReplicas(DatanodeDescriptor... newTargets) {
void incrementReplicas(DatanodeStorageInfo... newTargets) {
if (newTargets != null) {
for (DatanodeDescriptor dn : newTargets) {
for (DatanodeStorageInfo dn : newTargets) {
targets.add(dn);
}
}
}
void decrementReplicas(DatanodeDescriptor dn) {
targets.remove(dn);
void decrementReplicas(DatanodeStorageInfo storage) {
targets.remove(storage);
}
int getNumReplicas() {

View File

@ -141,6 +141,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -2495,14 +2496,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
// choose targets for the new block to be allocated.
// TODO: chooseTarget(..) should be changed to return DatanodeStorageInfo's
final DatanodeDescriptor chosenDatanodes[] = getBlockManager().chooseTarget(
final DatanodeStorageInfo targets[] = getBlockManager().chooseTarget(
src, replication, clientNode, excludedNodes, blockSize, favoredNodes);
final DatanodeStorageInfo[] targets = new DatanodeStorageInfo[chosenDatanodes.length];
for(int i = 0; i < targets.length; i++) {
final DatanodeDescriptor dd = chosenDatanodes[i];
targets[i] = dd.getStorageInfos().iterator().next();
}
// Part II.
// Allocate a new block, add it to the INode and the BlocksMap.
@ -2644,7 +2639,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
long offset) throws IOException {
LocatedBlock lBlk = LocatedBlock.createLocatedBlock(
LocatedBlock lBlk = new LocatedBlock(
getExtendedBlock(blk), locs, offset, false);
getBlockManager().setBlockToken(
lBlk, BlockTokenSecretManager.AccessMode.WRITE);
@ -2653,7 +2648,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk,
final DatanodeInfo[] existings, final Set<Node> excludes,
final DatanodeInfo[] existings, final String[] storageIDs,
final Set<Node> excludes,
final int numAdditionalNodes, final String clientName
) throws IOException {
//check if the feature is enabled
@ -2661,7 +2657,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final DatanodeDescriptor clientnode;
final long preferredblocksize;
final List<DatanodeDescriptor> chosen;
final List<DatanodeStorageInfo> chosen;
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
@ -2679,23 +2675,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
clientnode = file.getClientNode();
preferredblocksize = file.getPreferredBlockSize();
//find datanode descriptors
chosen = new ArrayList<DatanodeDescriptor>();
for(DatanodeInfo d : existings) {
final DatanodeDescriptor descriptor = blockManager.getDatanodeManager(
).getDatanode(d);
if (descriptor != null) {
chosen.add(descriptor);
}
}
//find datanode storages
final DatanodeManager dm = blockManager.getDatanodeManager();
chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs));
} finally {
readUnlock();
}
// choose new datanodes.
final DatanodeInfo[] targets = blockManager.getBlockPlacementPolicy(
final DatanodeStorageInfo[] targets = blockManager.getBlockPlacementPolicy(
).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
excludes, preferredblocksize);
// TODO: get storage type from the file
excludes, preferredblocksize, StorageType.DEFAULT);
final LocatedBlock lb = new LocatedBlock(blk, targets);
blockManager.setBlockToken(lb, AccessMode.COPY);
return lb;
@ -5634,9 +5625,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
for (int i = 0; i < blocks.length; i++) {
ExtendedBlock blk = blocks[i].getBlock();
DatanodeInfo[] nodes = blocks[i].getLocations();
String[] storageIDs = blocks[i].getStorageIDs();
for (int j = 0; j < nodes.length; j++) {
//TODO: add "storageID to LocatedBlock
blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], "STORAGE_ID",
blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], storageIDs[j],
"client machine reported it");
}
}

View File

@ -566,7 +566,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
final DatanodeInfo[] existings, final String[] existingStorageIDs,
final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName
) throws IOException {
if (LOG.isDebugEnabled()) {
@ -587,8 +588,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
excludeSet.add(node);
}
}
return namesystem.getAdditionalDatanode(src, blk,
existings, excludeSet, numAdditionalNodes, clientName);
return namesystem.getAdditionalDatanode(src, blk, existings,
existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
}
/**
* The client needs to give up on the block.

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -61,6 +62,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -158,7 +160,7 @@ public class NamenodeWebHdfsMethods {
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize, Configuration conf) throws IOException {
final long blocksize, final Configuration conf) throws IOException {
final BlockManager bm = namenode.getNamesystem().getBlockManager();
if (op == PutOpParam.Op.CREATE) {
@ -166,11 +168,13 @@ public class NamenodeWebHdfsMethods {
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
).getDatanodeByHost(getRemoteAddress());
if (clientNode != null) {
final DatanodeDescriptor[] datanodes = bm.getBlockPlacementPolicy()
final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy()
.chooseTarget(path, 1, clientNode,
new ArrayList<DatanodeDescriptor>(), false, null, blocksize);
if (datanodes.length > 0) {
return datanodes[0];
new ArrayList<DatanodeStorageInfo>(), false, null, blocksize,
// TODO: get storage type from the file
StorageType.DEFAULT);
if (storages.length > 0) {
return storages[0].getDatanodeDescriptor();
}
}
} else if (op == GetOpParam.Op.OPEN

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
/****************************************************
* A BlockCommand is an instruction to a datanode
@ -46,9 +47,10 @@ public class BlockCommand extends DatanodeCommand {
*/
public static final long NO_ACK = Long.MAX_VALUE;
String poolId;
Block blocks[];
DatanodeInfo targets[][];
final String poolId;
final Block[] blocks;
final DatanodeInfo[][] targets;
final String[][] targetStorageIDs;
/**
* Create BlockCommand for transferring blocks to another datanode
@ -60,21 +62,26 @@ public class BlockCommand extends DatanodeCommand {
this.poolId = poolId;
blocks = new Block[blocktargetlist.size()];
targets = new DatanodeInfo[blocks.length][];
targetStorageIDs = new String[blocks.length][];
for(int i = 0; i < blocks.length; i++) {
BlockTargetPair p = blocktargetlist.get(i);
blocks[i] = p.block;
targets[i] = p.targets;
targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
}
}
private static final DatanodeInfo[][] EMPTY_TARGET = {};
private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {};
private static final String[][] EMPTY_TARGET_STORAGEIDS = {};
/**
* Create BlockCommand for the given action
* @param blocks blocks related to the action
*/
public BlockCommand(int action, String poolId, Block blocks[]) {
this(action, poolId, blocks, EMPTY_TARGET);
this(action, poolId, blocks, EMPTY_TARGET_DATANODES,
EMPTY_TARGET_STORAGEIDS);
}
/**
@ -82,11 +89,12 @@ public class BlockCommand extends DatanodeCommand {
* @param blocks blocks related to the action
*/
public BlockCommand(int action, String poolId, Block[] blocks,
DatanodeInfo[][] targets) {
DatanodeInfo[][] targets, String[][] targetStorageIDs) {
super(action);
this.poolId = poolId;
this.blocks = blocks;
this.targets = targets;
this.targetStorageIDs = targetStorageIDs;
}
public String getBlockPoolId() {
@ -100,4 +108,8 @@ public class BlockCommand extends DatanodeCommand {
public DatanodeInfo[][] getTargets() {
return targets;
}
public String[][] getTargetStorageIDs() {
return targetStorageIDs;
}
}

View File

@ -141,6 +141,7 @@ message GetAdditionalDatanodeRequestProto {
repeated DatanodeInfoProto excludes = 4;
required uint32 numAdditionalNodes = 5;
required string clientName = 6;
repeated string existingStorageIDs = 7;
}
message GetAdditionalDatanodeResponseProto {

View File

@ -105,10 +105,12 @@ message BlockCommandProto {
INVALIDATE = 2; // Invalidate blocks
SHUTDOWN = 3; // Shutdown the datanode
}
required Action action = 1;
required string blockPoolId = 2;
repeated BlockProto blocks = 3;
repeated DatanodeInfosProto targets = 4;
repeated StorageIDsProto targetStorageIDs = 5;
}
/**

View File

@ -121,6 +121,13 @@ enum StorageTypeProto {
SSD = 2;
}
/**
* A list of storage IDs.
*/
message StorageIDsProto {
repeated string storageIDs = 1;
}
/**
* A LocatedBlock gives information about a block and its location.
*/

View File

@ -860,9 +860,35 @@ public class DFSTestUtil {
public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip) {
return createDatanodeStorageInfo(storageID, ip, "defaultRack");
}
public static DatanodeStorageInfo[] createDatanodeStorageInfos(String[] racks) {
return createDatanodeStorageInfos(racks.length, racks);
}
public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n, String... racks) {
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n];
for(int i = storages.length; i > 0; ) {
final String storageID = "s" + i;
final String ip = i + "." + i + "." + i + "." + i;
i--;
final String rack = i < racks.length? racks[i]: "defaultRack";
storages[i] = createDatanodeStorageInfo(storageID, ip, rack);
}
return storages;
}
public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip, String rack) {
final DatanodeStorage storage = new DatanodeStorage(storageID);
return new DatanodeStorageInfo(
getDatanodeDescriptor(ip, "defaultRack"),
new DatanodeStorage(storageID));
BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage), storage);
}
public static DatanodeDescriptor[] toDatanodeDescriptor(
DatanodeStorageInfo[] storages) {
DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length];
for(int i = 0; i < datanodes.length; i++) {
datanodes[i] = storages[i].getDatanodeDescriptor();
}
return datanodes;
}
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,

View File

@ -539,8 +539,9 @@ public class TestPBHelper {
dnInfos[0][0] = DFSTestUtil.getLocalDatanodeInfo();
dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo();
dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo();
String[][] storageIDs = {{"s00"}, {"s10", "s11"}};
BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
blocks, dnInfos);
blocks, dnInfos, storageIDs);
BlockCommandProto bcProto = PBHelper.convert(bc);
BlockCommand bc2 = PBHelper.convert(bcProto);
assertEquals(bc.getAction(), bc2.getAction());

View File

@ -227,10 +227,16 @@ public class BlockManagerTestUtil {
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
String rackLocation, boolean initializeStorage) {
return getDatanodeDescriptor(ipAddr, rackLocation,
initializeStorage? new DatanodeStorage(DatanodeStorage.newStorageID()): null);
}
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
String rackLocation, DatanodeStorage storage) {
DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr,
DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation);
if (initializeStorage) {
dn.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
if (storage != null) {
dn.updateStorage(storage);
}
return dn;
}

View File

@ -22,10 +22,14 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
@ -45,7 +49,6 @@ import org.apache.hadoop.net.NetworkTopology;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.mockito.Mockito.*;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
@ -53,6 +56,7 @@ import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
public class TestBlockManager {
private DatanodeStorageInfo[] storages;
private List<DatanodeDescriptor> nodes;
private List<DatanodeDescriptor> rackA;
private List<DatanodeDescriptor> rackB;
@ -81,14 +85,15 @@ public class TestBlockManager {
fsn = Mockito.mock(FSNamesystem.class);
Mockito.doReturn(true).when(fsn).hasWriteLock();
bm = new BlockManager(fsn, fsn, conf);
nodes = ImmutableList.of(
BlockManagerTestUtil.getDatanodeDescriptor("1.1.1.1", "/rackA", true),
BlockManagerTestUtil.getDatanodeDescriptor("2.2.2.2", "/rackA", true),
BlockManagerTestUtil.getDatanodeDescriptor("3.3.3.3", "/rackA", true),
BlockManagerTestUtil.getDatanodeDescriptor("4.4.4.4", "/rackB", true),
BlockManagerTestUtil.getDatanodeDescriptor("5.5.5.5", "/rackB", true),
BlockManagerTestUtil.getDatanodeDescriptor("6.6.6.6", "/rackB", true)
);
final String[] racks = {
"/rackA",
"/rackA",
"/rackA",
"/rackB",
"/rackB",
"/rackB"};
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
nodes = Arrays.asList(DFSTestUtil.toDatanodeDescriptor(storages));
rackA = nodes.subList(0, 3);
rackB = nodes.subList(3, 6);
}
@ -125,17 +130,18 @@ public class TestBlockManager {
}
private void doBasicTest(int testIndex) {
List<DatanodeDescriptor> origNodes = getNodes(0, 1);
List<DatanodeStorageInfo> origStorages = getStorages(0, 1);
List<DatanodeDescriptor> origNodes = getNodes(origStorages);
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
assertEquals(2, pipeline.length);
assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0],
origNodes.contains(pipeline[0]));
origStorages.contains(pipeline[0]));
assertTrue("Destination of replication should be on the other rack. " +
"Was: " + pipeline[1],
rackB.contains(pipeline[1]));
rackB.contains(pipeline[1].getDatanodeDescriptor()));
}
@ -156,21 +162,22 @@ public class TestBlockManager {
private void doTestTwoOfThreeNodesDecommissioned(int testIndex) throws Exception {
// Block originally on A1, A2, B1
List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
List<DatanodeDescriptor> origNodes = getNodes(origStorages);
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
// Decommission two of the nodes (A1, A2)
List<DatanodeDescriptor> decomNodes = startDecommission(0, 1);
DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0],
origNodes.contains(pipeline[0]));
origStorages.contains(pipeline[0]));
assertEquals("Should have three targets", 3, pipeline.length);
boolean foundOneOnRackA = false;
for (int i = 1; i < pipeline.length; i++) {
DatanodeDescriptor target = pipeline[i];
DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
if (rackA.contains(target)) {
foundOneOnRackA = true;
}
@ -199,22 +206,23 @@ public class TestBlockManager {
private void doTestAllNodesHoldingReplicasDecommissioned(int testIndex) throws Exception {
// Block originally on A1, A2, B1
List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
List<DatanodeDescriptor> origNodes = getNodes(origStorages);
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
// Decommission all of the nodes
List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 3);
DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0],
origNodes.contains(pipeline[0]));
origStorages.contains(pipeline[0]));
assertEquals("Should have three targets", 4, pipeline.length);
boolean foundOneOnRackA = false;
boolean foundOneOnRackB = false;
for (int i = 1; i < pipeline.length; i++) {
DatanodeDescriptor target = pipeline[i];
DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
if (rackA.contains(target)) {
foundOneOnRackA = true;
} else if (rackB.contains(target)) {
@ -251,21 +259,22 @@ public class TestBlockManager {
private void doTestOneOfTwoRacksDecommissioned(int testIndex) throws Exception {
// Block originally on A1, A2, B1
List<DatanodeDescriptor> origNodes = getNodes(0, 1, 3);
List<DatanodeStorageInfo> origStorages = getStorages(0, 1, 3);
List<DatanodeDescriptor> origNodes = getNodes(origStorages);
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
// Decommission all of the nodes in rack A
List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 2);
DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
DatanodeStorageInfo[] pipeline = scheduleSingleReplication(blockInfo);
assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0],
origNodes.contains(pipeline[0]));
origStorages.contains(pipeline[0]));
assertEquals("Should have three targets", 3, pipeline.length);
boolean foundOneOnRackB = false;
for (int i = 1; i < pipeline.length; i++) {
DatanodeDescriptor target = pipeline[i];
DatanodeDescriptor target = pipeline[i].getDatanodeDescriptor();
if (rackB.contains(target)) {
foundOneOnRackB = true;
}
@ -287,9 +296,9 @@ public class TestBlockManager {
rackCNode.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
addNodes(ImmutableList.of(rackCNode));
try {
DatanodeDescriptor[] pipeline2 = scheduleSingleReplication(blockInfo);
DatanodeStorageInfo[] pipeline2 = scheduleSingleReplication(blockInfo);
assertEquals(2, pipeline2.length);
assertEquals(rackCNode, pipeline2[1]);
assertEquals(rackCNode, pipeline2[1].getDatanodeDescriptor());
} finally {
removeNode(rackCNode);
}
@ -311,15 +320,15 @@ public class TestBlockManager {
// Originally on only nodes in rack A.
List<DatanodeDescriptor> origNodes = rackA;
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
DatanodeDescriptor pipeline[] = scheduleSingleReplication(blockInfo);
DatanodeStorageInfo pipeline[] = scheduleSingleReplication(blockInfo);
assertEquals(2, pipeline.length); // single new copy
assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0],
origNodes.contains(pipeline[0]));
origNodes.contains(pipeline[0].getDatanodeDescriptor()));
assertTrue("Destination of replication should be on the other rack. " +
"Was: " + pipeline[1],
rackB.contains(pipeline[1]));
rackB.contains(pipeline[1].getDatanodeDescriptor()));
}
@Test
@ -354,19 +363,11 @@ public class TestBlockManager {
* pipeline.
*/
private void fulfillPipeline(BlockInfo blockInfo,
DatanodeDescriptor[] pipeline) throws IOException {
DatanodeStorageInfo[] pipeline) throws IOException {
for (int i = 1; i < pipeline.length; i++) {
DatanodeDescriptor dn = pipeline[i];
Iterator<DatanodeStorageInfo> iterator = dn.getStorageInfos().iterator();
if (iterator.hasNext()) {
DatanodeStorageInfo storage = iterator.next();
bm.addBlock(dn, storage.getStorageID(), blockInfo, null);
blockInfo.addStorage(storage);
} else {
throw new RuntimeException("Storage info on node: " + dn.getHostName()
+ " is invalid.");
}
DatanodeStorageInfo storage = pipeline[i];
bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null);
blockInfo.addStorage(storage);
}
}
@ -389,6 +390,22 @@ public class TestBlockManager {
}
return ret;
}
private List<DatanodeDescriptor> getNodes(List<DatanodeStorageInfo> storages) {
List<DatanodeDescriptor> ret = Lists.newArrayList();
for (DatanodeStorageInfo s : storages) {
ret.add(s.getDatanodeDescriptor());
}
return ret;
}
private List<DatanodeStorageInfo> getStorages(int ... indexes) {
List<DatanodeStorageInfo> ret = Lists.newArrayList();
for (int idx : indexes) {
ret.add(storages[idx]);
}
return ret;
}
private List<DatanodeDescriptor> startDecommission(int ... indexes) {
List<DatanodeDescriptor> nodes = getNodes(indexes);
@ -407,7 +424,7 @@ public class TestBlockManager {
return blockInfo;
}
private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
private DatanodeStorageInfo[] scheduleSingleReplication(Block block) {
// list for priority 1
List<Block> list_p1 = new ArrayList<Block>();
list_p1.add(block);
@ -425,27 +442,29 @@ public class TestBlockManager {
assertTrue("replication is pending after work is computed",
bm.pendingReplications.getNumReplicas(block) > 0);
LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications();
LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls = getAllPendingReplications();
assertEquals(1, repls.size());
Entry<DatanodeDescriptor, BlockTargetPair> repl =
Entry<DatanodeStorageInfo, BlockTargetPair> repl =
repls.entries().iterator().next();
DatanodeDescriptor[] targets = repl.getValue().targets;
DatanodeStorageInfo[] targets = repl.getValue().targets;
DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
DatanodeStorageInfo[] pipeline = new DatanodeStorageInfo[1 + targets.length];
pipeline[0] = repl.getKey();
System.arraycopy(targets, 0, pipeline, 1, targets.length);
return pipeline;
}
private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications() {
LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingReplications() {
LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> repls =
LinkedListMultimap.create();
for (DatanodeDescriptor dn : nodes) {
List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
if (thisRepls != null) {
repls.putAll(dn, thisRepls);
for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
repls.putAll(storage, thisRepls);
}
}
}
return repls;
@ -468,7 +487,7 @@ public class TestBlockManager {
addBlockOnNodes(blockId,origNodes.subList(0,1));
List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>();
List<DatanodeDescriptor> liveNodes = new LinkedList<DatanodeDescriptor>();
List<DatanodeStorageInfo> liveNodes = new LinkedList<DatanodeStorageInfo>();
assertNotNull("Chooses source node for a highest-priority replication"
+ " even if all available source nodes have reached their replication"
@ -491,7 +510,7 @@ public class TestBlockManager {
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
// Increase the replication count to test replication count > hard limit
DatanodeDescriptor targets[] = { origNodes.get(1) };
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
assertNull("Does not choose a source node for a highest-priority"
@ -507,8 +526,7 @@ public class TestBlockManager {
@Test
public void testSafeModeIBR() throws Exception {
DatanodeDescriptor node = spy(nodes.get(0));
Iterator<DatanodeStorageInfo> i = node.getStorageInfos().iterator();
DatanodeStorageInfo ds = i.next();
DatanodeStorageInfo ds = node.getStorageInfos()[0];
node.setStorageID(ds.getStorageID());
node.isAlive = true;
@ -552,8 +570,7 @@ public class TestBlockManager {
@Test
public void testSafeModeIBRAfterIncremental() throws Exception {
DatanodeDescriptor node = spy(nodes.get(0));
Iterator<DatanodeStorageInfo> i = node.getStorageInfos().iterator();
DatanodeStorageInfo ds = i.next();
DatanodeStorageInfo ds = node.getStorageInfos()[0];
node.setStorageID(ds.getStorageID());
node.isAlive = true;

View File

@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
@ -60,9 +59,9 @@ public class TestDatanodeDescriptor {
assertEquals(0, dd.numBlocks());
BlockInfo blk = new BlockInfo(new Block(1L), 1);
BlockInfo blk1 = new BlockInfo(new Block(2L), 2);
Iterator<DatanodeStorageInfo> iterator = dd.getStorageInfos().iterator();
assertTrue(iterator.hasNext());
final String storageID = iterator.next().getStorageID();
DatanodeStorageInfo[] storages = dd.getStorageInfos();
assertTrue(storages.length > 0);
final String storageID = storages[0].getStorageID();
// add first block
assertTrue(dd.addBlock(storageID, blk));
assertEquals(1, dd.numBlocks());

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Test;
/**
@ -62,6 +63,8 @@ public class TestHeartbeatHandling {
final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
final String storageID = DatanodeStorage.newStorageID();
dd.updateStorage(new DatanodeStorage(storageID));
final int REMAINING_BLOCKS = 1;
final int MAX_REPLICATE_LIMIT =
@ -69,7 +72,7 @@ public class TestHeartbeatHandling {
final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
final int MAX_REPLICATE_BLOCKS = 2*MAX_REPLICATE_LIMIT+REMAINING_BLOCKS;
final DatanodeDescriptor[] ONE_TARGET = new DatanodeDescriptor[1];
final DatanodeStorageInfo[] ONE_TARGET = {dd.getStorageInfo(storageID)};
try {
namesystem.writeLock();
@ -143,12 +146,15 @@ public class TestHeartbeatHandling {
final DatanodeRegistration nodeReg1 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
dd1.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
final DatanodeRegistration nodeReg2 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
dd2.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
final DatanodeRegistration nodeReg3 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
dd3.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
try {
namesystem.writeLock();
@ -162,9 +168,9 @@ public class TestHeartbeatHandling {
dd2.setLastUpdate(System.currentTimeMillis());
dd3.setLastUpdate(System.currentTimeMillis());
final DatanodeStorageInfo[] storages = {
dd1.getStorageInfos().iterator().next(),
dd2.getStorageInfos().iterator().next(),
dd3.getStorageInfos().iterator().next()};
dd1.getStorageInfos()[0],
dd2.getStorageInfos()[0],
dd3.getStorageInfos()[0]};
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY, storages);

View File

@ -43,8 +43,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.junit.Test;
import com.google.common.base.Preconditions;
/**
* This class tests the internals of PendingReplicationBlocks.java,
* as well as how PendingReplicationBlocks acts in BlockManager
@ -54,22 +52,7 @@ public class TestPendingReplication {
private static final int DFS_REPLICATION_INTERVAL = 1;
// Number of datanodes in the cluster
private static final int DATANODE_COUNT = 5;
private DatanodeDescriptor genDatanodeId(int seed) {
seed = seed % 256;
String ip = seed + "." + seed + "." + seed + "." + seed;
return DFSTestUtil.getDatanodeDescriptor(ip, null);
}
private DatanodeDescriptor[] genDatanodes(int number) {
Preconditions.checkArgument(number >= 0);
DatanodeDescriptor[] nodes = new DatanodeDescriptor[number];
for (int i = 0; i < number; i++) {
nodes[i] = genDatanodeId(i);
}
return nodes;
}
@Test
public void testPendingReplication() {
PendingReplicationBlocks pendingReplications;
@ -79,9 +62,12 @@ public class TestPendingReplication {
//
// Add 10 blocks to pendingReplications.
//
for (int i = 0; i < 10; i++) {
DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
for (int i = 0; i < storages.length; i++) {
Block block = new Block(i, i, 0);
pendingReplications.increment(block, genDatanodes(i));
DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
System.arraycopy(storages, 0, targets, 0, i);
pendingReplications.increment(block, targets);
}
assertEquals("Size of pendingReplications ",
10, pendingReplications.size());
@ -91,16 +77,18 @@ public class TestPendingReplication {
// remove one item and reinsert it
//
Block blk = new Block(8, 8, 0);
pendingReplications.decrement(blk, genDatanodeId(7)); // removes one replica
pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor(),
storages[7].getStorageID()); // removes one replica
assertEquals("pendingReplications.getNumReplicas ",
7, pendingReplications.getNumReplicas(blk));
for (int i = 0; i < 7; i++) {
// removes all replicas
pendingReplications.decrement(blk, genDatanodeId(i));
pendingReplications.decrement(blk, storages[i].getDatanodeDescriptor(),
storages[i].getStorageID());
}
assertTrue(pendingReplications.size() == 9);
pendingReplications.increment(blk, genDatanodes(8));
pendingReplications.increment(blk, DFSTestUtil.createDatanodeStorageInfos(8));
assertTrue(pendingReplications.size() == 10);
//
@ -128,7 +116,7 @@ public class TestPendingReplication {
for (int i = 10; i < 15; i++) {
Block block = new Block(i, i, 0);
pendingReplications.increment(block, genDatanodes(i));
pendingReplications.increment(block, DFSTestUtil.createDatanodeStorageInfos(i));
}
assertTrue(pendingReplications.size() == 15);

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@ -35,6 +36,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
@ -44,6 +46,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@ -67,6 +70,10 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
public class TestReplicationPolicy {
{
((Log4JLogger)BlockPlacementPolicy.LOG).getLogger().setLevel(Level.ALL);
}
private Random random = DFSUtil.getRandom();
private static final int BLOCK_SIZE = 1024;
private static final int NUM_OF_DATANODES = 6;
@ -75,7 +82,7 @@ public class TestReplicationPolicy {
private static BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
private static DatanodeDescriptor dataNodes[];
private static String[] storageIDs;
private static DatanodeStorageInfo[] storages;
// The interval for marking a datanode as stale,
private static long staleInterval =
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
@ -86,14 +93,15 @@ public class TestReplicationPolicy {
@BeforeClass
public static void setupCluster() throws Exception {
Configuration conf = new HdfsConfiguration();
dataNodes = new DatanodeDescriptor[] {
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1"),
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1"),
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r2"),
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2"),
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d2/r3"),
DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3")
};
final String[] racks = {
"/d1/r1",
"/d1/r1",
"/d1/r2",
"/d1/r2",
"/d2/r3",
"/d2/r3"};
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
@ -124,6 +132,13 @@ public class TestReplicationPolicy {
}
}
private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
return isOnSameRack(left, right.getDatanodeDescriptor());
}
private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) {
return cluster.isOnSameRack(left.getDatanodeDescriptor(), right);
}
/**
* In this testcase, client is dataNodes[0]. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on
@ -139,69 +154,69 @@ public class TestReplicationPolicy {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[0]);
assertEquals(storages[0], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(targets[1], targets[2]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
cluster.isOnSameRack(targets[2], targets[3]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
assertEquals(storages[0], targets[0]);
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[0], targets[2]));
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
return chooseTarget(numOfReplicas, dataNodes[0]);
}
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer) {
return chooseTarget(numOfReplicas, writer,
new ArrayList<DatanodeDescriptor>());
new ArrayList<DatanodeStorageInfo>());
}
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
List<DatanodeDescriptor> chosenNodes) {
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
List<DatanodeStorageInfo> chosenNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
}
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
return chooseTarget(numOfReplicas, writer, chosenNodes, null);
}
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
List<DatanodeDescriptor> chosenNodes, Set<Node> excludedNodes) {
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes);
}
private static DatanodeDescriptor[] chooseTarget(
private static DatanodeStorageInfo[] chooseTarget(
int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
List<DatanodeStorageInfo> chosenNodes,
Set<Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
false, excludedNodes, BLOCK_SIZE);
false, excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
}
/**
@ -215,8 +230,8 @@ public class TestReplicationPolicy {
@Test
public void testChooseTarget2() throws Exception {
Set<Node> excludedNodes;
DatanodeDescriptor[] targets;
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
DatanodeStorageInfo[] targets;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
@ -228,49 +243,52 @@ public class TestReplicationPolicy {
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[0]);
assertEquals(storages[0], targets[0]);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(2, chosenNodes, excludedNodes);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(targets[1], targets[2]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
targets = chooseTarget(4, chosenNodes, excludedNodes);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
assertEquals(storages[0], targets[0]);
for(int i=1; i<4; i++) {
assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
assertFalse(isOnSameRack(targets[0], targets[i]));
}
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
cluster.isOnSameRack(targets[2], targets[3]));
assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[1], targets[3]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
chosenNodes.add(dataNodes[2]);
chosenNodes.add(storages[2]);
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE);
excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
int i = 0;
for (; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
for (; i < targets.length && !storages[2].equals(targets[i]); i++);
assertTrue(i < targets.length);
}
@ -289,34 +307,34 @@ public class TestReplicationPolicy {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]);
assertEquals(storages[1], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[1]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertEquals(storages[1], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[1]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertEquals(storages[1], targets[0]);
assertTrue(isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[1]);
assertEquals(storages[1], targets[0]);
for(int i=1; i<4; i++) {
assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
assertFalse(isOnSameRack(targets[0], targets[i]));
}
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
cluster.isOnSameRack(targets[2], targets[3]));
assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[1], targets[3]));
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
@ -340,27 +358,27 @@ public class TestReplicationPolicy {
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
}
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
for(int i=0; i<3; i++) {
assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
assertFalse(isOnSameRack(targets[i], dataNodes[0]));
}
assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
assertTrue(isOnSameRack(targets[0], targets[1]) ||
isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameRack(targets[0], targets[2]));
for(int i=0; i<2; i++) {
dataNodes[i].updateHeartbeat(
@ -381,7 +399,7 @@ public class TestReplicationPolicy {
DatanodeDescriptor writerDesc =
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r4");
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, writerDesc);
assertEquals(targets.length, 0);
@ -390,12 +408,12 @@ public class TestReplicationPolicy {
targets = chooseTarget(2, writerDesc);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, writerDesc);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameRack(targets[0], targets[1]));
}
/**
@ -436,7 +454,7 @@ public class TestReplicationPolicy {
// try to choose NUM_OF_DATANODES which is more than actually available
// nodes.
DatanodeDescriptor[] targets = chooseTarget(NUM_OF_DATANODES);
DatanodeStorageInfo[] targets = chooseTarget(NUM_OF_DATANODES);
assertEquals(targets.length, NUM_OF_DATANODES - 2);
final List<LoggingEvent> log = appender.getLog();
@ -456,18 +474,30 @@ public class TestReplicationPolicy {
}
}
private boolean containsWithinRange(DatanodeDescriptor target,
private boolean containsWithinRange(DatanodeStorageInfo target,
DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < nodes.length;
assert endIndex >= startIndex && endIndex < nodes.length;
for (int i = startIndex; i <= endIndex; i++) {
if (nodes[i].equals(target)) {
if (nodes[i].equals(target.getDatanodeDescriptor())) {
return true;
}
}
return false;
}
private boolean containsWithinRange(DatanodeDescriptor target,
DatanodeStorageInfo[] nodes, int startIndex, int endIndex) {
assert startIndex >= 0 && startIndex < nodes.length;
assert endIndex >= startIndex && endIndex < nodes.length;
for (int i = startIndex; i <= endIndex; i++) {
if (nodes[i].getDatanodeDescriptor().equals(target)) {
return true;
}
}
return false;
}
@Test
public void testChooseTargetWithStaleNodes() throws Exception {
// Set dataNodes[0] as stale
@ -476,19 +506,19 @@ public class TestReplicationPolicy {
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
assertTrue(namenode.getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
// We set the datanode[0] as stale, thus should choose datanode[1] since
// datanode[1] is on the same rack with datanode[0] (writer)
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]);
assertEquals(storages[1], targets[0]);
Set<Node> excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
// reset
dataNodes[0].setLastUpdate(Time.now());
@ -513,7 +543,7 @@ public class TestReplicationPolicy {
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
DatanodeDescriptor[] targets = chooseTarget(0);
DatanodeStorageInfo[] targets = chooseTarget(0);
assertEquals(targets.length, 0);
// Since we have 6 datanodes total, stale nodes should
@ -585,11 +615,12 @@ public class TestReplicationPolicy {
.getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
BlockPlacementPolicy replicator = miniCluster.getNameNode()
.getNamesystem().getBlockManager().getBlockPlacementPolicy();
DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null,
BLOCK_SIZE, StorageType.DEFAULT);
assertEquals(targets.length, 3);
assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
assertFalse(isOnSameRack(targets[0], staleNodeInfo));
// Step 2. Set more than half of the datanodes as stale
for (int i = 0; i < 4; i++) {
@ -610,10 +641,11 @@ public class TestReplicationPolicy {
assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
// Call chooseTarget
targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
targets = replicator.chooseTarget(filename, 3, staleNodeInfo,
new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
StorageType.DEFAULT);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
assertTrue(isOnSameRack(targets[0], staleNodeInfo));
// Step 3. Set 2 stale datanodes back to healthy nodes,
// still have 2 stale nodes
@ -635,7 +667,7 @@ public class TestReplicationPolicy {
// Call chooseTarget
targets = chooseTarget(3, staleNodeInfo);
assertEquals(targets.length, 3);
assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
assertFalse(isOnSameRack(targets[0], staleNodeInfo));
} finally {
miniCluster.shutdown();
}
@ -650,26 +682,26 @@ public class TestReplicationPolicy {
*/
@Test
public void testRereplicate1() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
DatanodeDescriptor[] targets;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
chosenNodes.add(storages[0]);
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, chosenNodes);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
assertTrue(isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[0], targets[2]));
}
/**
@ -681,22 +713,22 @@ public class TestReplicationPolicy {
*/
@Test
public void testRereplicate2() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
chosenNodes.add(dataNodes[1]);
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[1]);
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[1], dataNodes[0]));
}
/**
@ -708,31 +740,31 @@ public class TestReplicationPolicy {
*/
@Test
public void testRereplicate3() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
chosenNodes.add(dataNodes[2]);
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[2]);
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0]));
assertTrue(isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(targets[0], dataNodes[2]));
targets = chooseTarget(1, dataNodes[2], chosenNodes);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertTrue(isOnSameRack(targets[0], dataNodes[2]));
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertTrue(isOnSameRack(targets[0], dataNodes[0]));
targets = chooseTarget(2, dataNodes[2], chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
assertTrue(isOnSameRack(targets[0], dataNodes[2]));
}
/**
@ -1174,4 +1206,4 @@ public class TestReplicationPolicy {
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
@ -57,41 +58,57 @@ public class TestReplicationPolicyWithNodeGroup {
private BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n2"),
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/n3"),
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n4"),
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
};
private final static DatanodeDescriptor dataNodesInBoundaryCase[] =
new DatanodeDescriptor[] {
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r1/n2"),
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n3")
};
private final static DatanodeDescriptor dataNodesInMoreTargetsCase[] =
new DatanodeDescriptor[] {
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/r1/n2"),
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/r1/n2"),
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/r1/n3"),
DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/r1/n3"),
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/r2/n4"),
DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/r2/n4"),
DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/r2/n5"),
DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/r2/n5"),
DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/r2/n6"),
DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/r2/n6"),
private static final DatanodeStorageInfo[] storages;
private static final DatanodeDescriptor[] dataNodes;
static {
final String[] racks = {
"/d1/r1/n1",
"/d1/r1/n1",
"/d1/r1/n2",
"/d1/r2/n3",
"/d1/r2/n3",
"/d1/r2/n4",
"/d2/r3/n5",
"/d2/r3/n6"
};
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
}
private static final DatanodeStorageInfo[] storagesInBoundaryCase;
private static final DatanodeDescriptor[] dataNodesInBoundaryCase;
static {
final String[] racksInBoundaryCase = {
"/d1/r1/n1",
"/d1/r1/n1",
"/d1/r1/n1",
"/d1/r1/n2",
"/d1/r2/n3",
"/d1/r2/n3"
};
storagesInBoundaryCase = DFSTestUtil.createDatanodeStorageInfos(racksInBoundaryCase);
dataNodesInBoundaryCase = DFSTestUtil.toDatanodeDescriptor(storagesInBoundaryCase);
}
private static final DatanodeStorageInfo[] storagesInMoreTargetsCase;
private final static DatanodeDescriptor[] dataNodesInMoreTargetsCase;
static {
final String[] racksInMoreTargetsCase = {
"/r1/n1",
"/r1/n1",
"/r1/n2",
"/r1/n2",
"/r1/n3",
"/r1/n3",
"/r2/n4",
"/r2/n4",
"/r2/n5",
"/r2/n5",
"/r2/n6",
"/r2/n6"
};
storagesInMoreTargetsCase = DFSTestUtil.createDatanodeStorageInfos(racksInMoreTargetsCase);
dataNodesInMoreTargetsCase = DFSTestUtil.toDatanodeDescriptor(storagesInMoreTargetsCase);
};
private final static DatanodeDescriptor NODE =
@ -142,11 +159,12 @@ public class TestReplicationPolicyWithNodeGroup {
* Return false if two targets are found on the same NodeGroup.
*/
private static boolean checkTargetsOnDifferentNodeGroup(
DatanodeDescriptor[] targets) {
DatanodeStorageInfo[] targets) {
if(targets.length == 0)
return true;
Set<String> targetSet = new HashSet<String>();
for(DatanodeDescriptor node:targets) {
for(DatanodeStorageInfo storage:targets) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
if(targetSet.contains(nodeGroup)) {
return false;
@ -156,34 +174,50 @@ public class TestReplicationPolicyWithNodeGroup {
}
return true;
}
private DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
private boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) {
return isOnSameRack(left.getDatanodeDescriptor(), right);
}
private boolean isOnSameRack(DatanodeDescriptor left, DatanodeStorageInfo right) {
return cluster.isOnSameRack(left, right.getDatanodeDescriptor());
}
private boolean isOnSameNodeGroup(DatanodeStorageInfo left, DatanodeStorageInfo right) {
return isOnSameNodeGroup(left.getDatanodeDescriptor(), right);
}
private boolean isOnSameNodeGroup(DatanodeDescriptor left, DatanodeStorageInfo right) {
return cluster.isOnSameNodeGroup(left, right.getDatanodeDescriptor());
}
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
return chooseTarget(numOfReplicas, dataNodes[0]);
}
private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer) {
return chooseTarget(numOfReplicas, writer,
new ArrayList<DatanodeDescriptor>());
new ArrayList<DatanodeStorageInfo>());
}
private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
List<DatanodeDescriptor> chosenNodes) {
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
List<DatanodeStorageInfo> chosenNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
}
private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) {
return chooseTarget(numOfReplicas, writer, chosenNodes, null);
}
private DatanodeDescriptor[] chooseTarget(
private DatanodeStorageInfo[] chooseTarget(
int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
List<DatanodeStorageInfo> chosenNodes,
Set<Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
false, excludedNodes, BLOCK_SIZE);
false, excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
}
/**
@ -201,32 +235,36 @@ public class TestReplicationPolicyWithNodeGroup {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[0]);
assertEquals(storages[0], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameNodeGroup(targets[1], targets[2]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
cluster.isOnSameRack(targets[2], targets[3]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
assertEquals(storages[0], targets[0]);
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[0], targets[2]));
// Make sure no more than one replicas are on the same nodegroup
verifyNoTwoTargetsOnSameNodeGroup(targets);
@ -235,10 +273,10 @@ public class TestReplicationPolicyWithNodeGroup {
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets) {
private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeStorageInfo[] targets) {
Set<String> nodeGroupSet = new HashSet<String>();
for (DatanodeDescriptor target: targets) {
nodeGroupSet.add(target.getNetworkLocation());
for (DatanodeStorageInfo target: targets) {
nodeGroupSet.add(target.getDatanodeDescriptor().getNetworkLocation());
}
assertEquals(nodeGroupSet.size(), targets.length);
}
@ -253,36 +291,37 @@ public class TestReplicationPolicyWithNodeGroup {
*/
@Test
public void testChooseTarget2() throws Exception {
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
Set<Node> excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[1]);
targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false,
excludedNodes, BLOCK_SIZE);
excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
assertEquals(storages[0], targets[0]);
assertTrue(cluster.isNodeGroupAware());
// Make sure no replicas are on the same nodegroup
for (int i=1;i<4;i++) {
assertFalse(cluster.isOnSameNodeGroup(targets[0], targets[i]));
assertFalse(isOnSameNodeGroup(targets[0], targets[i]));
}
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
cluster.isOnSameRack(targets[2], targets[3]));
assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[1], targets[3]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
chosenNodes.add(dataNodes[2]);
chosenNodes.add(storages[2]);
targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE);
excludedNodes, BLOCK_SIZE, StorageType.DEFAULT);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
int i = 0;
for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
for(; i < targets.length && !storages[2].equals(targets[i]); i++);
assertTrue(i < targets.length);
}
@ -301,32 +340,32 @@ public class TestReplicationPolicyWithNodeGroup {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]);
assertEquals(storages[1], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[1]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertEquals(storages[1], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[1]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertEquals(storages[1], targets[0]);
assertTrue(isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[1]);
assertEquals(storages[1], targets[0]);
assertTrue(cluster.isNodeGroupAware());
verifyNoTwoTargetsOnSameNodeGroup(targets);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
cluster.isOnSameRack(targets[2], targets[3]));
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
@ -351,28 +390,28 @@ public class TestReplicationPolicyWithNodeGroup {
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
}
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
assertFalse(isOnSameRack(dataNodes[0], targets[0]));
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertFalse(isOnSameRack(dataNodes[0], targets[0]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3);
assertEquals(targets.length, 3);
for(int i=0; i<3; i++) {
assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
assertFalse(isOnSameRack(dataNodes[0], targets[i]));
}
verifyNoTwoTargetsOnSameNodeGroup(targets);
assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
assertTrue(isOnSameRack(targets[0], targets[1]) ||
isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameRack(targets[0], targets[2]));
}
/**
@ -385,7 +424,7 @@ public class TestReplicationPolicyWithNodeGroup {
@Test
public void testChooseTarget5() throws Exception {
setupDataNodeCapacity();
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, NODE);
assertEquals(targets.length, 0);
@ -394,12 +433,12 @@ public class TestReplicationPolicyWithNodeGroup {
targets = chooseTarget(2, NODE);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, NODE);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(targets[1], targets[2]));
assertFalse(isOnSameRack(targets[0], targets[1]));
verifyNoTwoTargetsOnSameNodeGroup(targets);
}
@ -413,27 +452,27 @@ public class TestReplicationPolicyWithNodeGroup {
@Test
public void testRereplicate1() throws Exception {
setupDataNodeCapacity();
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
DatanodeDescriptor[] targets;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
chosenNodes.add(storages[0]);
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(isOnSameRack(dataNodes[0], targets[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(dataNodes[0], targets[0]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, chosenNodes);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
assertTrue(isOnSameRack(dataNodes[0], targets[0]));
assertFalse(isOnSameNodeGroup(dataNodes[0], targets[0]));
assertFalse(isOnSameRack(targets[0], targets[2]));
}
/**
@ -446,22 +485,22 @@ public class TestReplicationPolicyWithNodeGroup {
@Test
public void testRereplicate2() throws Exception {
setupDataNodeCapacity();
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
chosenNodes.add(dataNodes[1]);
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[1]);
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(isOnSameRack(dataNodes[0], targets[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) &&
cluster.isOnSameRack(dataNodes[0], targets[1]));
assertFalse(isOnSameRack(dataNodes[0], targets[0]) &&
isOnSameRack(dataNodes[0], targets[1]));
}
/**
@ -474,33 +513,33 @@ public class TestReplicationPolicyWithNodeGroup {
@Test
public void testRereplicate3() throws Exception {
setupDataNodeCapacity();
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
chosenNodes.add(dataNodes[3]);
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
chosenNodes.add(storages[0]);
chosenNodes.add(storages[3]);
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
assertTrue(isOnSameRack(dataNodes[0], targets[0]));
assertFalse(isOnSameRack(dataNodes[3], targets[0]));
targets = chooseTarget(1, dataNodes[3], chosenNodes);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertTrue(isOnSameRack(dataNodes[3], targets[0]));
assertFalse(isOnSameNodeGroup(dataNodes[3], targets[0]));
assertFalse(isOnSameRack(dataNodes[0], targets[0]));
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
assertTrue(isOnSameRack(dataNodes[0], targets[0]));
assertFalse(isOnSameNodeGroup(dataNodes[0], targets[0]));
targets = chooseTarget(2, dataNodes[3], chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
assertTrue(isOnSameRack(dataNodes[3], targets[0]));
}
/**
@ -583,7 +622,7 @@ public class TestReplicationPolicyWithNodeGroup {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
targets = chooseTarget(0, dataNodesInBoundaryCase[0]);
assertEquals(targets.length, 0);
@ -592,7 +631,7 @@ public class TestReplicationPolicyWithNodeGroup {
targets = chooseTarget(2, dataNodesInBoundaryCase[0]);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertFalse(isOnSameRack(targets[0], targets[1]));
targets = chooseTarget(3, dataNodesInBoundaryCase[0]);
assertEquals(targets.length, 3);
@ -613,15 +652,13 @@ public class TestReplicationPolicyWithNodeGroup {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodesInBoundaryCase[0]);
chosenNodes.add(dataNodesInBoundaryCase[5]);
DatanodeDescriptor[] targets;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
chosenNodes.add(storagesInBoundaryCase[0]);
chosenNodes.add(storagesInBoundaryCase[5]);
DatanodeStorageInfo[] targets;
targets = chooseTarget(1, dataNodesInBoundaryCase[0], chosenNodes);
assertFalse(cluster.isOnSameNodeGroup(targets[0],
dataNodesInBoundaryCase[0]));
assertFalse(cluster.isOnSameNodeGroup(targets[0],
dataNodesInBoundaryCase[5]));
assertFalse(isOnSameNodeGroup(dataNodesInBoundaryCase[0], targets[0]));
assertFalse(isOnSameNodeGroup(dataNodesInBoundaryCase[5], targets[0]));
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
}
@ -654,7 +691,7 @@ public class TestReplicationPolicyWithNodeGroup {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
DatanodeDescriptor[] targets;
DatanodeStorageInfo[] targets;
// Test normal case -- 3 replicas
targets = chooseTarget(3, dataNodesInMoreTargetsCase[0]);
assertEquals(targets.length, 3);