HDFS-5009. Include storage information in the LocatedBlock.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1519691 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-09-03 14:03:35 +00:00
parent f7e3bc553d
commit 3f070e83b1
31 changed files with 269 additions and 186 deletions

View File

@ -13,5 +13,7 @@ IMPROVEMENTS:
HDFS-4987. Namenode changes to track multiple storages per datanode.
(szetszwo)
HDFS-5154. Fix TestBlockManager and TestDatanodeDescriptor after HDFS-4987.
HDFS-5154. Fix TestBlockManager and TestDatanodeDescriptor after HDFS-4987.
(Junping Du via szetszwo)
HDFS-5009. Include storage information in the LocatedBlock. (szetszwo)

View File

@ -85,7 +85,6 @@
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.mortbay.log.Log;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
@ -312,6 +311,8 @@ class DataStreamer extends Daemon {
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
//TODO: update storage IDs
private volatile String[] storageIDs = null;
private LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
CacheBuilder.newBuilder()
.expireAfterWrite(
@ -1039,7 +1040,8 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
// update pipeline at the namenode
ExtendedBlock newBlock = new ExtendedBlock(
block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, nodes);
dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
nodes, storageIDs);
// update client side generation stamp
block = newBlock;
}

View File

@ -982,7 +982,7 @@ public LocatedBlock updateBlockForPipeline(ExtendedBlock block,
*/
@AtMostOnce
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
throws IOException;
/**

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.security.token.Token;
/**
@ -35,6 +36,8 @@ public class LocatedBlock {
private ExtendedBlock b;
private long offset; // offset of the first byte of the block in the file
private DatanodeInfo[] locs;
/** Storage ID for each replica */
private String[] storageIDs;
// Storage type for each replica, if reported.
private StorageType[] storageTypes;
// corrupt flag is true if all of the replicas of a block are corrupt.
@ -53,10 +56,22 @@ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset) {
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset,
boolean corrupt) {
this(b, locs, null, startOffset, corrupt);
this(b, locs, null, null, startOffset, corrupt);
}
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
public static LocatedBlock createLocatedBlock(ExtendedBlock b,
DatanodeStorageInfo[] storages, long startOffset, boolean corrupt) {
final DatanodeInfo[] locs = new DatanodeInfo[storages.length];
final String[] storageIDs = new String[storages.length];
final StorageType[] storageType = new StorageType[storages.length];
for(int i = 0; i < storages.length; i++) {
locs[i] = storages[i].getDatanodeDescriptor();
storageIDs[i] = storages[i].getStorageID();
storageType[i] = storages[i].getStorageType();
}
return new LocatedBlock(b, locs, storageIDs, storageType, startOffset, corrupt);
}
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs,
StorageType[] storageTypes, long startOffset,
boolean corrupt) {
this.b = b;
@ -67,6 +82,7 @@ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs,
} else {
this.locs = locs;
}
this.storageIDs = storageIDs;
this.storageTypes = storageTypes;
}
@ -94,6 +110,10 @@ public StorageType[] getStorageTypes() {
return storageTypes;
}
public String[] getStorageIDs() {
return storageIDs;
}
public long getStartOffset() {
return offset;
}

View File

@ -813,10 +813,12 @@ public UpdatePipelineResponseProto updatePipeline(RpcController controller,
UpdatePipelineRequestProto req) throws ServiceException {
try {
List<DatanodeIDProto> newNodes = req.getNewNodesList();
server
.updatePipeline(req.getClientName(), PBHelper.convert(req
.getOldBlock()), PBHelper.convert(req.getNewBlock()), PBHelper
.convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()])));
List<String> newStorageIDs = req.getStorageIDsList();
server.updatePipeline(req.getClientName(),
PBHelper.convert(req.getOldBlock()),
PBHelper.convert(req.getNewBlock()),
PBHelper.convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()])),
newStorageIDs.toArray(new String[newStorageIDs.size()]));
return VOID_UPDATEPIPELINE_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);

View File

@ -780,12 +780,13 @@ public LocatedBlock updateBlockForPipeline(ExtendedBlock block,
@Override
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes) throws IOException {
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) throws IOException {
UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder()
.setClientName(clientName)
.setOldBlock(PBHelper.convert(oldBlock))
.setNewBlock(PBHelper.convert(newBlock))
.addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes)))
.addAllStorageIDs(Arrays.asList(storageIDs))
.build();
try {
rpcProxy.updatePipeline(null, req);

View File

@ -575,6 +575,8 @@ public static LocatedBlockProto convert(LocatedBlock b) {
builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
}
}
builder.addAllStorageIDs(Arrays.asList(b.getStorageIDs()));
return builder.setB(PBHelper.convert(b.getBlock()))
.setBlockToken(PBHelper.convert(b.getBlockToken()))
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
@ -602,6 +604,7 @@ public static LocatedBlock convert(LocatedBlockProto proto) {
}
LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
proto.getStorageIDsList().toArray(new String[proto.getStorageIDsCount()]),
storageTypes, proto.getOffset(), proto.getCorrupt());
lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
return lb;

View File

@ -356,10 +356,10 @@ public boolean isComplete() {
* @return BlockInfoUnderConstruction - an under construction block.
*/
public BlockInfoUnderConstruction convertToBlockUnderConstruction(
BlockUCState s, DatanodeDescriptor[] targets) {
BlockUCState s, DatanodeStorageInfo[] targets) {
if(isComplete()) {
return new BlockInfoUnderConstruction(
this, getBlockCollection().getBlockReplication(), s, targets);
return new BlockInfoUnderConstruction(this,
getBlockCollection().getBlockReplication(), s, targets);
}
// the block is already under construction
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;

View File

@ -19,7 +19,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -64,12 +63,12 @@ public class BlockInfoUnderConstruction extends BlockInfo {
* corresponding replicas.
*/
static class ReplicaUnderConstruction extends Block {
private DatanodeDescriptor expectedLocation;
private final DatanodeStorageInfo expectedLocation;
private ReplicaState state;
private boolean chosenAsPrimary;
ReplicaUnderConstruction(Block block,
DatanodeDescriptor target,
DatanodeStorageInfo target,
ReplicaState state) {
super(block);
this.expectedLocation = target;
@ -83,7 +82,7 @@ static class ReplicaUnderConstruction extends Block {
* It is not guaranteed, but expected, that the data-node actually has
* the replica.
*/
DatanodeDescriptor getExpectedLocation() {
private DatanodeStorageInfo getExpectedStorageLocation() {
return expectedLocation;
}
@ -119,7 +118,7 @@ void setChosenAsPrimary(boolean chosenAsPrimary) {
* Is data-node the replica belongs to alive.
*/
boolean isAlive() {
return expectedLocation.isAlive;
return expectedLocation.getDatanodeDescriptor().isAlive;
}
@Override // Block
@ -163,7 +162,7 @@ public BlockInfoUnderConstruction(Block blk, int replication) {
*/
public BlockInfoUnderConstruction(Block blk, int replication,
BlockUCState state,
DatanodeDescriptor[] targets) {
DatanodeStorageInfo[] targets) {
super(blk, replication);
assert getBlockUCState() != BlockUCState.COMPLETE :
"BlockInfoUnderConstruction cannot be in COMPLETE state";
@ -187,7 +186,7 @@ assert getBlockUCState() != BlockUCState.COMPLETE :
}
/** Set expected locations */
public void setExpectedLocations(DatanodeDescriptor[] targets) {
public void setExpectedLocations(DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length;
this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
for(int i = 0; i < numLocations; i++)
@ -199,12 +198,12 @@ public void setExpectedLocations(DatanodeDescriptor[] targets) {
* Create array of expected replica locations
* (as has been assigned by chooseTargets()).
*/
public DatanodeDescriptor[] getExpectedLocations() {
public DatanodeStorageInfo[] getExpectedStorageLocations() {
int numLocations = replicas == null ? 0 : replicas.size();
DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations];
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
for(int i = 0; i < numLocations; i++)
locations[i] = replicas.get(i).getExpectedLocation();
return locations;
storages[i] = replicas.get(i).getExpectedStorageLocation();
return storages;
}
/** Get the number of expected locations */
@ -279,27 +278,29 @@ public void initializeBlockRecovery(long recoveryId) {
if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
continue;
}
if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) {
primary = replicas.get(i);
final ReplicaUnderConstruction ruc = replicas.get(i);
final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate();
if (lastUpdate > mostRecentLastUpdate) {
primaryNodeIndex = i;
mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate();
primary = ruc;
mostRecentLastUpdate = lastUpdate;
}
}
if (primary != null) {
primary.getExpectedLocation().addBlockToBeRecovered(this);
primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
primary.setChosenAsPrimary(true);
NameNode.blockStateChangeLog.info("BLOCK* " + this
+ " recovery started, primary=" + primary);
}
}
void addReplicaIfNotPresent(DatanodeDescriptor dn,
void addReplicaIfNotPresent(DatanodeStorageInfo storage,
Block block,
ReplicaState rState) {
for(ReplicaUnderConstruction r : replicas)
if(r.getExpectedLocation() == dn)
if(r.getExpectedStorageLocation() == storage)
return;
replicas.add(new ReplicaUnderConstruction(block, dn, rState));
replicas.add(new ReplicaUnderConstruction(block, storage, rState));
}
@Override // BlockInfo

View File

@ -500,9 +500,8 @@ private void dumpBlockMeta(Block block, PrintWriter out) {
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(block);
for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
jt.hasNext();) {
DatanodeDescriptor node = jt.next();
for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
String state = "";
if (corruptNodes != null && corruptNodes.contains(node)) {
state = "(corrupt)";
@ -662,10 +661,9 @@ public LocatedBlock convertLastBlockToUnderConstruction(
assert oldBlock == getStoredBlock(oldBlock) :
"last block of the file is not in blocksMap";
DatanodeDescriptor[] targets = getNodes(oldBlock);
DatanodeStorageInfo[] targets = getStorages(oldBlock);
BlockInfoUnderConstruction ucBlock =
bc.setLastBlock(oldBlock, targets);
BlockInfoUnderConstruction ucBlock = bc.setLastBlock(oldBlock, targets);
blocksMap.replaceBlock(ucBlock);
// Remove block from replication queue.
@ -675,9 +673,8 @@ public LocatedBlock convertLastBlockToUnderConstruction(
pendingReplications.remove(ucBlock);
// remove this block from the list of pending blocks to be deleted.
for (DatanodeDescriptor dd : targets) {
String datanodeId = dd.getStorageID();
invalidateBlocks.remove(datanodeId, oldBlock);
for (DatanodeStorageInfo storage : targets) {
invalidateBlocks.remove(storage.getStorageID(), oldBlock);
}
// Adjust safe-mode totals, since under-construction blocks don't
@ -699,9 +696,8 @@ public LocatedBlock convertLastBlockToUnderConstruction(
private List<String> getValidLocations(Block block) {
ArrayList<String> machineSet =
new ArrayList<String>(blocksMap.numNodes(block));
for(Iterator<DatanodeDescriptor> it =
blocksMap.nodeIterator(block); it.hasNext();) {
String storageID = it.next().getStorageID();
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final String storageID = storage.getStorageID();
// filter invalidate replicas
if(!invalidateBlocks.contains(storageID, block)) {
machineSet.add(storageID);
@ -775,9 +771,9 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
+ ", blk=" + blk);
}
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk;
final DatanodeDescriptor[] locations = uc.getExpectedLocations();
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return new LocatedBlock(eb, locations, pos, false);
return LocatedBlock.createLocatedBlock(eb, storages, pos, false);
}
// get block locations
@ -795,9 +791,8 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
int j = 0;
if (numMachines > 0) {
for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
it.hasNext();) {
final DatanodeDescriptor d = it.next();
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
if (isCorrupt || (!isCorrupt && !replicaCorrupt))
machines[j++] = d;
@ -1017,9 +1012,8 @@ void addToInvalidates(final Block block, final DatanodeInfo datanode) {
*/
private void addToInvalidates(Block b) {
StringBuilder datanodes = new StringBuilder();
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it
.hasNext();) {
DatanodeDescriptor node = it.next();
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
invalidateBlocks.add(b, node, false);
datanodes.append(node).append(" ");
}
@ -1466,10 +1460,10 @@ DatanodeDescriptor chooseSourceDatanode(
int decommissioned = 0;
int corrupt = 0;
int excess = 0;
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
while(it.hasNext()) {
DatanodeDescriptor node = it.next();
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
LightWeightLinkedSet<Block> excessBlocks =
excessReplicateMap.get(node.getStorageID());
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
@ -1790,7 +1784,7 @@ private void processFirstBlockReport(final DatanodeDescriptor node,
// If block is under construction, add this replica to its list
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
node, iblk, reportedState);
node.getStorageInfo(storageID), iblk, reportedState);
//and fall through to next clause
}
//add replica if appropriate
@ -2093,7 +2087,7 @@ void addStoredBlockUnderConstruction(
DatanodeDescriptor node, String storageID,
ReplicaState reportedState)
throws IOException {
block.addReplicaIfNotPresent(node, block, reportedState);
block.addReplicaIfNotPresent(node.getStorageInfo(storageID), block, reportedState);
if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
addStoredBlock(block, node, storageID, null, true);
}
@ -2425,9 +2419,8 @@ private void processOverReplicatedBlock(final Block block,
Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
it.hasNext();) {
DatanodeDescriptor cur = it.next();
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (cur.areBlockContentsStale()) {
LOG.info("BLOCK* processOverReplicatedBlock: " +
"Postponing processing of over-replicated " +
@ -2747,10 +2740,9 @@ public NumberReplicas countNodes(Block b) {
int corrupt = 0;
int excess = 0;
int stale = 0;
Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
while (nodeIter.hasNext()) {
DatanodeDescriptor node = nodeIter.next();
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++;
} else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
@ -2787,10 +2779,9 @@ int countLiveNodes(BlockInfo b) {
}
// else proceed with fast case
int live = 0;
Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
while (nodeIter.hasNext()) {
DatanodeDescriptor node = nodeIter.next();
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
live++;
}
@ -2802,10 +2793,9 @@ private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
int curReplicas = num.liveReplicas();
int curExpectedReplicas = getReplication(block);
BlockCollection bc = blocksMap.getBlockCollection(block);
Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(block);
StringBuilder nodeList = new StringBuilder();
while (nodeIter.hasNext()) {
DatanodeDescriptor node = nodeIter.next();
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
nodeList.append(node);
nodeList.append(" ");
}
@ -2902,14 +2892,13 @@ public int getActiveBlockCount() {
return blocksMap.size();
}
public DatanodeDescriptor[] getNodes(BlockInfo block) {
DatanodeDescriptor[] nodes =
new DatanodeDescriptor[block.numNodes()];
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
for (int i = 0; it != null && it.hasNext(); i++) {
nodes[i] = it.next();
public DatanodeStorageInfo[] getStorages(BlockInfo block) {
final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()];
int i = 0;
for(DatanodeStorageInfo s : blocksMap.getStorages(block)) {
storages[i++] = s;
}
return nodes;
return storages;
}
public int getTotalBlocks() {
@ -3038,9 +3027,8 @@ boolean blockHasEnoughRacks(Block b) {
corruptReplicas.getNodes(b);
int numExpectedReplicas = getReplication(b);
String rackName = null;
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b);
it.hasNext();) {
DatanodeDescriptor cur = it.next();
for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
if (numExpectedReplicas == 1 ||
@ -3084,8 +3072,8 @@ public BlockCollection getBlockCollection(Block b) {
}
/** @return an iterator of the datanodes. */
public Iterator<DatanodeDescriptor> datanodeIterator(final Block block) {
return blocksMap.nodeIterator(block);
public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
return blocksMap.getStorages(block);
}
public int numCorruptReplicas(Block block) {

View File

@ -29,11 +29,11 @@
* the datanodes that store the block.
*/
class BlocksMap {
private static class NodeIterator implements Iterator<DatanodeDescriptor> {
private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
private BlockInfo blockInfo;
private int nextIdx = 0;
NodeIterator(BlockInfo blkInfo) {
StorageIterator(BlockInfo blkInfo) {
this.blockInfo = blkInfo;
}
@ -44,8 +44,8 @@ public boolean hasNext() {
}
@Override
public DatanodeDescriptor next() {
return blockInfo.getDatanode(nextIdx++);
public DatanodeStorageInfo next() {
return blockInfo.getStorageInfo(nextIdx++);
}
@Override
@ -115,18 +115,23 @@ BlockInfo getStoredBlock(Block b) {
/**
* Searches for the block in the BlocksMap and
* returns Iterator that iterates through the nodes the block belongs to.
* returns {@link Iterable} that iterates through the nodes the block belongs to.
*/
Iterator<DatanodeDescriptor> nodeIterator(Block b) {
return nodeIterator(blocks.get(b));
Iterable<DatanodeStorageInfo> getStorages(Block b) {
return getStorages(blocks.get(b));
}
/**
* For a block that has already been retrieved from the BlocksMap
* returns Iterator that iterates through the nodes the block belongs to.
* returns {@link Iterable} that iterates through the nodes the block belongs to.
*/
Iterator<DatanodeDescriptor> nodeIterator(BlockInfo storedBlock) {
return new NodeIterator(storedBlock);
Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
return new Iterable<DatanodeStorageInfo>() {
@Override
public Iterator<DatanodeStorageInfo> iterator() {
return new StorageIterator(storedBlock);
}
};
}
/** counts number of containing nodes. Better than using iterator. */

View File

@ -23,7 +23,13 @@
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -48,10 +54,17 @@
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.util.CyclicIteration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
@ -423,6 +436,20 @@ public DatanodeDescriptor getDatanode(DatanodeID nodeID
return node;
}
public DatanodeStorageInfo[] getDatanodeStorageInfos(
DatanodeID[] datanodeID, String[] storageIDs)
throws UnregisteredNodeException {
if (datanodeID.length == 0) {
return null;
}
final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[datanodeID.length];
for(int i = 0; i < datanodeID.length; i++) {
final DatanodeDescriptor dd = getDatanode(datanodeID[i]);
storages[i] = dd.getStorageInfo(storageIDs[i]);
}
return storages;
}
/** Prints information about all datanodes. */
void datanodeDump(final PrintWriter out) {
synchronized (datanodeMap) {
@ -1151,32 +1178,32 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
for (BlockInfoUnderConstruction b : blocks) {
DatanodeDescriptor[] expectedLocations = b.getExpectedLocations();
final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
List<DatanodeDescriptor> recoveryLocations =
new ArrayList<DatanodeDescriptor>(expectedLocations.length);
for (int i = 0; i < expectedLocations.length; i++) {
if (!expectedLocations[i].isStale(this.staleInterval)) {
recoveryLocations.add(expectedLocations[i]);
final List<DatanodeStorageInfo> recoveryLocations =
new ArrayList<DatanodeStorageInfo>(storages.length);
for (int i = 0; i < storages.length; i++) {
if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
recoveryLocations.add(storages[i]);
}
}
// If we only get 1 replica after eliminating stale nodes, then choose all
// replicas for recovery and let the primary data node handle failures.
if (recoveryLocations.size() > 1) {
if (recoveryLocations.size() != expectedLocations.length) {
if (recoveryLocations.size() != storages.length) {
LOG.info("Skipped stale nodes for recovery : " +
(expectedLocations.length - recoveryLocations.size()));
(storages.length - recoveryLocations.size()));
}
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]),
DatanodeStorageInfo.toDatanodeInfos(recoveryLocations),
b.getBlockRecoveryId()));
} else {
// If too many replicas are stale, then choose all replicas to participate
// in block recovery.
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b),
expectedLocations,
DatanodeStorageInfo.toDatanodeInfos(storages),
b.getBlockRecoveryId()));
}
}

View File

@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -29,6 +32,17 @@
* by this class.
*/
public class DatanodeStorageInfo {
static DatanodeInfo[] toDatanodeInfos(DatanodeStorageInfo[] storages) {
return toDatanodeInfos(Arrays.asList(storages));
}
static DatanodeInfo[] toDatanodeInfos(List<DatanodeStorageInfo> storages) {
final DatanodeInfo[] datanodes = new DatanodeInfo[storages.size()];
for(int i = 0; i < storages.size(); i++) {
datanodes[i] = storages.get(i).getDatanodeDescriptor();
}
return datanodes;
}
/**
* Iterates over the list of blocks belonging to the data-node.
*/
@ -65,7 +79,7 @@ public void remove() {
private long remaining;
private volatile BlockInfo blockList = null;
DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
this.dn = dn;
this.storageID = s.getStorageID();
this.storageType = s.getStorageType();
@ -92,6 +106,10 @@ public String getStorageID() {
return storageID;
}
public StorageType getStorageType() {
return storageType;
}
public long getCapacity() {
return capacity;
}

View File

@ -34,5 +34,5 @@ public interface MutableBlockCollection extends BlockCollection {
* and set the locations.
*/
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeDescriptor[] locations) throws IOException;
DatanodeStorageInfo[] storages) throws IOException;
}

View File

@ -61,6 +61,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
@ -345,7 +346,7 @@ INodeFile unprotectedAddFile( long id,
* Add a block to the file. Returns a reference to the added block.
*/
BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
DatanodeDescriptor targets[]) throws IOException {
DatanodeStorageInfo[] targets) throws IOException {
waitForReady();
writeLock();

View File

@ -168,6 +168,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.OutOfV1GenerationStampsException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@ -2484,8 +2485,14 @@ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
}
// choose targets for the new block to be allocated.
final DatanodeDescriptor targets[] = getBlockManager().chooseTarget(
// TODO: chooseTarget(..) should be changed to return DatanodeStorageInfo's
final DatanodeDescriptor chosenDatanodes[] = getBlockManager().chooseTarget(
src, replication, clientNode, excludedNodes, blockSize, favoredNodes);
final DatanodeStorageInfo[] targets = new DatanodeStorageInfo[chosenDatanodes.length];
for(int i = 0; i < targets.length; i++) {
final DatanodeDescriptor dd = chosenDatanodes[i];
targets[i] = dd.getStorageInfos().iterator().next();
}
// Part II.
// Allocate a new block, add it to the INode and the BlocksMap.
@ -2607,7 +2614,7 @@ INodesInPath analyzeFileState(String src,
src + ". Returning previously allocated block " + lastBlockInFile);
long offset = pendingFile.computeFileSize();
onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(),
((BlockInfoUnderConstruction)lastBlockInFile).getExpectedStorageLocations(),
offset);
return iip;
} else {
@ -2625,11 +2632,10 @@ INodesInPath analyzeFileState(String src,
return iip;
}
LocatedBlock makeLocatedBlock(Block blk,
DatanodeInfo[] locs,
LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs,
long offset) throws IOException {
LocatedBlock lBlk = new LocatedBlock(
getExtendedBlock(blk), locs, offset);
LocatedBlock lBlk = LocatedBlock.createLocatedBlock(
getExtendedBlock(blk), locs, offset, false);
getBlockManager().setBlockToken(
lBlk, BlockTokenSecretManager.AccessMode.WRITE);
return lBlk;
@ -2852,13 +2858,14 @@ private boolean completeFileInternal(String src,
* @throws QuotaExceededException If addition of block exceeds space quota
*/
BlockInfo saveAllocatedBlock(String src, INodesInPath inodesInPath,
Block newBlock, DatanodeDescriptor targets[]) throws IOException {
Block newBlock, DatanodeStorageInfo[] targets)
throws IOException {
assert hasWriteLock();
BlockInfo b = dir.addBlock(src, inodesInPath, newBlock, targets);
NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
+ getBlockPoolId() + " " + b);
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
for (DatanodeStorageInfo storage : targets) {
storage.getDatanodeDescriptor().incBlocksScheduled();
}
return b;
}
@ -3622,7 +3629,7 @@ boolean internalReleaseLease(Lease lease, String src,
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)lastBlock;
// setup the last block locations from the blockManager if not known
if (uc.getNumExpectedLocations() == 0) {
uc.setExpectedLocations(blockManager.getNodes(lastBlock));
uc.setExpectedLocations(blockManager.getStorages(lastBlock));
}
// start recovery of the last block for this file
long blockRecoveryId = nextGenerationStamp(isLegacyBlock(uc));
@ -3777,24 +3784,18 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
// find the DatanodeDescriptor objects
// There should be no locations in the blockManager till now because the
// file is underConstruction
DatanodeDescriptor[] descriptors = null;
if (newtargets.length > 0) {
descriptors = new DatanodeDescriptor[newtargets.length];
for(int i = 0; i < newtargets.length; i++) {
descriptors[i] = blockManager.getDatanodeManager().getDatanode(
newtargets[i]);
}
}
if ((closeFile) && (descriptors != null)) {
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
.getDatanodeStorageInfos(newtargets, newtargetstorages);
if (closeFile && storages != null) {
// the file is getting closed. Insert block locations into blockManager.
// Otherwise fsck will report these blocks as MISSING, especially if the
// blocksReceived from Datanodes take a long time to arrive.
for (int i = 0; i < descriptors.length; i++) {
descriptors[i].addBlock(newtargetstorages[i], storedBlock);
for (int i = 0; i < storages.length; i++) {
storages[i].addBlock(storedBlock);
}
}
// add pipeline locations into the INodeUnderConstruction
pendingFile.setLastBlock(storedBlock, descriptors);
pendingFile.setLastBlock(storedBlock, storages);
}
if (closeFile) {
@ -5639,7 +5640,7 @@ LocatedBlock updateBlockForPipeline(ExtendedBlock block,
* @throws IOException if any error occurs
*/
void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
throws IOException {
CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
if (cacheEntry != null && cacheEntry.isSuccess()) {
@ -5662,7 +5663,7 @@ void updatePipeline(String clientName, ExtendedBlock oldBlock,
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
cacheEntry != null);
newStorageIDs, cacheEntry != null);
success = true;
} finally {
writeUnlock();
@ -5674,7 +5675,8 @@ void updatePipeline(String clientName, ExtendedBlock oldBlock,
/** @see #updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */
private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes, boolean logRetryCache)
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs,
boolean logRetryCache)
throws IOException {
assert hasWriteLock();
// check the vadility of the block and lease holder name
@ -5698,15 +5700,9 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
blockinfo.setNumBytes(newBlock.getNumBytes());
// find the DatanodeDescriptor objects
final DatanodeManager dm = getBlockManager().getDatanodeManager();
DatanodeDescriptor[] descriptors = null;
if (newNodes.length > 0) {
descriptors = new DatanodeDescriptor[newNodes.length];
for(int i = 0; i < newNodes.length; i++) {
descriptors[i] = dm.getDatanode(newNodes[i]);
}
}
blockinfo.setExpectedLocations(descriptors);
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
.getDatanodeStorageInfos(newNodes, newStorageIDs);
blockinfo.setExpectedLocations(storages);
String src = leaseManager.findPath(pendingFile);
dir.persistBlocks(src, pendingFile, logRetryCache);

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
@ -180,7 +181,7 @@ boolean removeLastBlock(Block oldblock) throws IOException {
*/
@Override
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeDescriptor[] targets) throws IOException {
DatanodeStorageInfo[] targets) throws IOException {
if (numBlocks() == 0) {
throw new IOException("Failed to set last block: File is empty.");
}

View File

@ -611,9 +611,9 @@ public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientNam
@Override // ClientProtocol
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
throws IOException {
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes, newStorageIDs);
}
@Override // DatanodeProtocol

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@ -1065,11 +1066,10 @@ public void toXML(XMLOutputter doc) throws IOException {
}
doc.startTag("replicas");
for(final Iterator<DatanodeDescriptor> it = blockManager.datanodeIterator(block);
it.hasNext(); ) {
for(DatanodeStorageInfo storage : blockManager.getStorages(block)) {
doc.startTag("replica");
DatanodeDescriptor dd = it.next();
DatanodeDescriptor dd = storage.getDatanodeDescriptor();
doc.startTag("host_name");
doc.pcdata(dd.getHostName());

View File

@ -437,6 +437,7 @@ message UpdatePipelineRequestProto {
required ExtendedBlockProto oldBlock = 2;
required ExtendedBlockProto newBlock = 3;
repeated DatanodeIDProto newNodes = 4;
repeated string storageIDs = 5;
}
message UpdatePipelineResponseProto { // void response

View File

@ -134,6 +134,7 @@ message LocatedBlockProto {
required hadoop.common.TokenProto blockToken = 5;
repeated StorageTypeProto storageTypes = 6;
repeated string storageIDs = 7;
}
message DataEncryptionKeyProto {

View File

@ -61,8 +61,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -79,6 +79,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -86,6 +87,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
@ -856,6 +858,13 @@ public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
rackLocation);
}
public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip) {
return new DatanodeStorageInfo(
getDatanodeDescriptor(ip, "defaultRack"),
new DatanodeStorage(storageID));
}
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
int port, String rackLocation) {
DatanodeID dnId = new DatanodeID(ipAddr, "host", "", port,

View File

@ -85,9 +85,8 @@ private static int getNumberOfRacks(final BlockManager blockManager,
final Set<String> rackSet = new HashSet<String>(0);
final Collection<DatanodeDescriptor> corruptNodes =
getCorruptReplicas(blockManager).getNodes(b);
for (Iterator<DatanodeDescriptor> it = blockManager.blocksMap.nodeIterator(b);
it.hasNext();) {
DatanodeDescriptor cur = it.next();
for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
String rackName = cur.getNetworkLocation();

View File

@ -31,18 +31,19 @@
public class TestBlockInfoUnderConstruction {
@Test
public void testInitializeBlockRecovery() throws Exception {
DatanodeDescriptor dd1 = DFSTestUtil.getDatanodeDescriptor("10.10.1.1",
"default");
DatanodeDescriptor dd2 = DFSTestUtil.getDatanodeDescriptor("10.10.1.2",
"default");
DatanodeDescriptor dd3 = DFSTestUtil.getDatanodeDescriptor("10.10.1.3",
"default");
DatanodeStorageInfo s1 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.1", "s1");
DatanodeDescriptor dd1 = s1.getDatanodeDescriptor();
DatanodeStorageInfo s2 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.2", "s2");
DatanodeDescriptor dd2 = s2.getDatanodeDescriptor();
DatanodeStorageInfo s3 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.3", "s3");
DatanodeDescriptor dd3 = s3.getDatanodeDescriptor();
dd1.isAlive = dd2.isAlive = dd3.isAlive = true;
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP),
3,
BlockUCState.UNDER_CONSTRUCTION,
new DatanodeDescriptor[] {dd1, dd2, dd3});
new DatanodeStorageInfo[] {s1, s2, s3});
// Recovery attempt #1.
long currentTime = System.currentTimeMillis();

View File

@ -20,7 +20,6 @@
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -162,10 +161,13 @@ public void testHeartbeatBlockRecovery() throws Exception {
dd1.setLastUpdate(System.currentTimeMillis());
dd2.setLastUpdate(System.currentTimeMillis());
dd3.setLastUpdate(System.currentTimeMillis());
final DatanodeStorageInfo[] storages = {
dd1.getStorageInfos().iterator().next(),
dd2.getStorageInfos().iterator().next(),
dd3.getStorageInfos().iterator().next()};
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY,
new DatanodeDescriptor[] {dd1, dd2, dd3});
BlockUCState.UNDER_RECOVERY, storages);
dd1.addBlockToBeRecovered(blockInfo);
DatanodeCommand[] cmds =
NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
@ -187,8 +189,7 @@ public void testHeartbeatBlockRecovery() throws Exception {
dd3.setLastUpdate(System.currentTimeMillis());
blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY,
new DatanodeDescriptor[] {dd1, dd2, dd3});
BlockUCState.UNDER_RECOVERY, storages);
dd1.addBlockToBeRecovered(blockInfo);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);
@ -209,8 +210,7 @@ public void testHeartbeatBlockRecovery() throws Exception {
dd3.setLastUpdate(System.currentTimeMillis() - 80 * 1000);
blockInfo = new BlockInfoUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
BlockUCState.UNDER_RECOVERY,
new DatanodeDescriptor[] {dd1, dd2, dd3});
BlockUCState.UNDER_RECOVERY, storages);
dd1.addBlockToBeRecovered(blockInfo);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
assertEquals(1, cmds.length);

View File

@ -98,11 +98,9 @@ public void testNodeCount() throws Exception {
}
// find out a non-excess node
final Iterator<DatanodeDescriptor> iter = bm.blocksMap
.nodeIterator(block.getLocalBlock());
DatanodeDescriptor nonExcessDN = null;
while (iter.hasNext()) {
DatanodeDescriptor dn = iter.next();
for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
Collection<Block> blocks = bm.excessReplicateMap.get(dn.getStorageID());
if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
nonExcessDN = dn;

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Time;
@ -71,6 +72,7 @@ public class TestReplicationPolicy {
private static BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
private static DatanodeDescriptor dataNodes[];
private static String[] storageIDs;
// The interval for marking a datanode as stale,
private static long staleInterval =
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
@ -1143,11 +1145,12 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
info.setBlockCollection(mbc);
bm.addBlockCollection(info, mbc);
DatanodeDescriptor[] dnAry = {dataNodes[0]};
DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo(
dataNodes[0], new DatanodeStorage("s1"))};
final BlockInfoUnderConstruction ucBlock =
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
dnAry);
when(mbc.setLastBlock((BlockInfo) any(), (DatanodeDescriptor[]) any()))
storageAry);
when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
.thenReturn(ucBlock);
bm.convertLastBlockToUnderConstruction(mbc);

View File

@ -48,7 +48,8 @@ public void testSetrepIncWithUnderReplicatedBlocks() throws Exception {
// but the block does not get put into the under-replicated blocks queue
final BlockManager bm = cluster.getNamesystem().getBlockManager();
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
DatanodeDescriptor dn = bm.blocksMap.nodeIterator(b.getLocalBlock()).next();
DatanodeDescriptor dn = bm.blocksMap.getStorages(b.getLocalBlock())
.iterator().next().getDatanodeDescriptor();
bm.addToInvalidates(b.getLocalBlock(), dn);
// Compute the invalidate work in NN, and trigger the heartbeat from DN
BlockManagerTestUtil.computeAllPendingWork(bm);

View File

@ -33,7 +33,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.junit.Test;
@ -50,7 +50,7 @@ private FSNamesystem makeNameSystemSpy(Block block,
throws IOException {
Configuration conf = new Configuration();
FSImage image = new FSImage(conf);
DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
final DatanodeStorageInfo[] targets = {};
FSNamesystem namesystem = new FSNamesystem(conf, image);
FSNamesystem namesystemSpy = spy(namesystem);

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -523,16 +524,17 @@ private DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
(BlockInfoUnderConstruction)storedBlock;
// We expect that the replica with the most recent heart beat will be
// the one to be in charge of the synchronization / recovery protocol.
DatanodeDescriptor[] datanodes = ucBlock.getExpectedLocations();
DatanodeDescriptor expectedPrimary = datanodes[0];
long mostRecentLastUpdate = expectedPrimary.getLastUpdate();
for (int i = 1; i < datanodes.length; i++) {
if (datanodes[i].getLastUpdate() > mostRecentLastUpdate) {
expectedPrimary = datanodes[i];
mostRecentLastUpdate = expectedPrimary.getLastUpdate();
final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
DatanodeStorageInfo expectedPrimary = storages[0];
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor().getLastUpdate();
for (int i = 1; i < storages.length; i++) {
final long lastUpdate = storages[i].getDatanodeDescriptor().getLastUpdate();
if (lastUpdate > mostRecentLastUpdate) {
expectedPrimary = storages[i];
mostRecentLastUpdate = lastUpdate;
}
}
return expectedPrimary;
return expectedPrimary.getDatanodeDescriptor();
}
private DistributedFileSystem createFsAsOtherUser(

View File

@ -702,9 +702,10 @@ void invoke() throws Exception {
DatanodeInfo[] newNodes = new DatanodeInfo[2];
newNodes[0] = nodes[0];
newNodes[1] = nodes[1];
String[] storageIDs = {"s0", "s1"};
client.getNamenode().updatePipeline(client.getClientName(), oldBlock,
newBlock, newNodes);
newBlock, newNodes, storageIDs);
out.close();
}
@ -714,10 +715,10 @@ boolean checkNamenodeBeforeReturn() throws Exception {
.getNamesystem(0).getFSDirectory().getINode4Write(file).asFile();
BlockInfoUnderConstruction blkUC =
(BlockInfoUnderConstruction) (fileNode.getBlocks())[1];
int datanodeNum = blkUC.getExpectedLocations().length;
int datanodeNum = blkUC.getExpectedStorageLocations().length;
for (int i = 0; i < CHECKTIMES && datanodeNum != 2; i++) {
Thread.sleep(1000);
datanodeNum = blkUC.getExpectedLocations().length;
datanodeNum = blkUC.getExpectedStorageLocations().length;
}
return datanodeNum == 2;
}