diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt index 916aace1530..5a6f2c76c4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt @@ -13,5 +13,7 @@ IMPROVEMENTS: HDFS-4987. Namenode changes to track multiple storages per datanode. (szetszwo) - HDFS-5154. Fix TestBlockManager and TestDatanodeDescriptor after HDFS-4987. + HDFS-5154. Fix TestBlockManager and TestDatanodeDescriptor after HDFS-4987. (Junping Du via szetszwo) + + HDFS-5009. Include storage information in the LocatedBlock. (szetszwo) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 927c530556d..a9bcc16eb44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -85,7 +85,6 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; -import org.mortbay.log.Log; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; @@ -312,6 +311,8 @@ public class DFSOutputStream extends FSOutputSummer private DataInputStream blockReplyStream; private ResponseProcessor response = null; private volatile DatanodeInfo[] nodes = null; // list of targets for current block + //TODO: update storage IDs + private volatile String[] storageIDs = null; private LoadingCache excludedNodes = CacheBuilder.newBuilder() .expireAfterWrite( @@ -1039,7 +1040,8 @@ public class DFSOutputStream extends FSOutputSummer // update pipeline at the namenode ExtendedBlock newBlock = new ExtendedBlock( block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); - dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, nodes); + dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, + nodes, storageIDs); // update client side generation stamp block = newBlock; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 5789c3615eb..66b6fbb0d90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -982,7 +982,7 @@ public interface ClientProtocol { */ @AtMostOnce public void updatePipeline(String clientName, ExtendedBlock oldBlock, - ExtendedBlock newBlock, DatanodeID[] newNodes) + ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index 93724d7569e..e26ca81d073 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.security.token.Token; /** @@ -35,6 +36,8 @@ public class LocatedBlock { private ExtendedBlock b; private long offset; // offset of the first byte of the block in the file private DatanodeInfo[] locs; + /** Storage ID for each replica */ + private String[] storageIDs; // Storage type for each replica, if reported. private StorageType[] storageTypes; // corrupt flag is true if all of the replicas of a block are corrupt. @@ -53,10 +56,22 @@ public class LocatedBlock { public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, long startOffset, boolean corrupt) { - this(b, locs, null, startOffset, corrupt); + this(b, locs, null, null, startOffset, corrupt); } - public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, + public static LocatedBlock createLocatedBlock(ExtendedBlock b, + DatanodeStorageInfo[] storages, long startOffset, boolean corrupt) { + final DatanodeInfo[] locs = new DatanodeInfo[storages.length]; + final String[] storageIDs = new String[storages.length]; + final StorageType[] storageType = new StorageType[storages.length]; + for(int i = 0; i < storages.length; i++) { + locs[i] = storages[i].getDatanodeDescriptor(); + storageIDs[i] = storages[i].getStorageID(); + storageType[i] = storages[i].getStorageType(); + } + return new LocatedBlock(b, locs, storageIDs, storageType, startOffset, corrupt); + } + public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs, StorageType[] storageTypes, long startOffset, boolean corrupt) { this.b = b; @@ -67,6 +82,7 @@ public class LocatedBlock { } else { this.locs = locs; } + this.storageIDs = storageIDs; this.storageTypes = storageTypes; } @@ -94,6 +110,10 @@ public class LocatedBlock { return storageTypes; } + public String[] getStorageIDs() { + return storageIDs; + } + public long getStartOffset() { return offset; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index d7a18a60ac8..76ba6cab59a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -813,10 +813,12 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements UpdatePipelineRequestProto req) throws ServiceException { try { List newNodes = req.getNewNodesList(); - server - .updatePipeline(req.getClientName(), PBHelper.convert(req - .getOldBlock()), PBHelper.convert(req.getNewBlock()), PBHelper - .convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()]))); + List newStorageIDs = req.getStorageIDsList(); + server.updatePipeline(req.getClientName(), + PBHelper.convert(req.getOldBlock()), + PBHelper.convert(req.getNewBlock()), + PBHelper.convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()])), + newStorageIDs.toArray(new String[newStorageIDs.size()])); return VOID_UPDATEPIPELINE_RESPONSE; } catch (IOException e) { throw new ServiceException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index b5be61a3bcd..7bcd014372f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -780,12 +780,13 @@ public class ClientNamenodeProtocolTranslatorPB implements @Override public void updatePipeline(String clientName, ExtendedBlock oldBlock, - ExtendedBlock newBlock, DatanodeID[] newNodes) throws IOException { + ExtendedBlock newBlock, DatanodeID[] newNodes, String[] storageIDs) throws IOException { UpdatePipelineRequestProto req = UpdatePipelineRequestProto.newBuilder() .setClientName(clientName) .setOldBlock(PBHelper.convert(oldBlock)) .setNewBlock(PBHelper.convert(newBlock)) .addAllNewNodes(Arrays.asList(PBHelper.convert(newNodes))) + .addAllStorageIDs(Arrays.asList(storageIDs)) .build(); try { rpcProxy.updatePipeline(null, req); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 3f015bdb09b..c82df6f8290 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -575,6 +575,8 @@ public class PBHelper { builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i])); } } + builder.addAllStorageIDs(Arrays.asList(b.getStorageIDs())); + return builder.setB(PBHelper.convert(b.getBlock())) .setBlockToken(PBHelper.convert(b.getBlockToken())) .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); @@ -602,6 +604,7 @@ public class PBHelper { } LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, + proto.getStorageIDsList().toArray(new String[proto.getStorageIDsCount()]), storageTypes, proto.getOffset(), proto.getCorrupt()); lb.setBlockToken(PBHelper.convert(proto.getBlockToken())); return lb; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index 7aa7721a022..9f7aabd4668 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -356,10 +356,10 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement { * @return BlockInfoUnderConstruction - an under construction block. */ public BlockInfoUnderConstruction convertToBlockUnderConstruction( - BlockUCState s, DatanodeDescriptor[] targets) { + BlockUCState s, DatanodeStorageInfo[] targets) { if(isComplete()) { - return new BlockInfoUnderConstruction( - this, getBlockCollection().getBlockReplication(), s, targets); + return new BlockInfoUnderConstruction(this, + getBlockCollection().getBlockReplication(), s, targets); } // the block is already under construction BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java index fc59acf10a6..e580c542969 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -64,12 +63,12 @@ public class BlockInfoUnderConstruction extends BlockInfo { * corresponding replicas. */ static class ReplicaUnderConstruction extends Block { - private DatanodeDescriptor expectedLocation; + private final DatanodeStorageInfo expectedLocation; private ReplicaState state; private boolean chosenAsPrimary; ReplicaUnderConstruction(Block block, - DatanodeDescriptor target, + DatanodeStorageInfo target, ReplicaState state) { super(block); this.expectedLocation = target; @@ -83,7 +82,7 @@ public class BlockInfoUnderConstruction extends BlockInfo { * It is not guaranteed, but expected, that the data-node actually has * the replica. */ - DatanodeDescriptor getExpectedLocation() { + private DatanodeStorageInfo getExpectedStorageLocation() { return expectedLocation; } @@ -119,7 +118,7 @@ public class BlockInfoUnderConstruction extends BlockInfo { * Is data-node the replica belongs to alive. */ boolean isAlive() { - return expectedLocation.isAlive; + return expectedLocation.getDatanodeDescriptor().isAlive; } @Override // Block @@ -163,7 +162,7 @@ public class BlockInfoUnderConstruction extends BlockInfo { */ public BlockInfoUnderConstruction(Block blk, int replication, BlockUCState state, - DatanodeDescriptor[] targets) { + DatanodeStorageInfo[] targets) { super(blk, replication); assert getBlockUCState() != BlockUCState.COMPLETE : "BlockInfoUnderConstruction cannot be in COMPLETE state"; @@ -187,7 +186,7 @@ public class BlockInfoUnderConstruction extends BlockInfo { } /** Set expected locations */ - public void setExpectedLocations(DatanodeDescriptor[] targets) { + public void setExpectedLocations(DatanodeStorageInfo[] targets) { int numLocations = targets == null ? 0 : targets.length; this.replicas = new ArrayList(numLocations); for(int i = 0; i < numLocations; i++) @@ -199,12 +198,12 @@ public class BlockInfoUnderConstruction extends BlockInfo { * Create array of expected replica locations * (as has been assigned by chooseTargets()). */ - public DatanodeDescriptor[] getExpectedLocations() { + public DatanodeStorageInfo[] getExpectedStorageLocations() { int numLocations = replicas == null ? 0 : replicas.size(); - DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations]; + DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; for(int i = 0; i < numLocations; i++) - locations[i] = replicas.get(i).getExpectedLocation(); - return locations; + storages[i] = replicas.get(i).getExpectedStorageLocation(); + return storages; } /** Get the number of expected locations */ @@ -279,27 +278,29 @@ public class BlockInfoUnderConstruction extends BlockInfo { if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) { continue; } - if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) { - primary = replicas.get(i); + final ReplicaUnderConstruction ruc = replicas.get(i); + final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate(); + if (lastUpdate > mostRecentLastUpdate) { primaryNodeIndex = i; - mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate(); + primary = ruc; + mostRecentLastUpdate = lastUpdate; } } if (primary != null) { - primary.getExpectedLocation().addBlockToBeRecovered(this); + primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this); primary.setChosenAsPrimary(true); NameNode.blockStateChangeLog.info("BLOCK* " + this + " recovery started, primary=" + primary); } } - void addReplicaIfNotPresent(DatanodeDescriptor dn, + void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block block, ReplicaState rState) { for(ReplicaUnderConstruction r : replicas) - if(r.getExpectedLocation() == dn) + if(r.getExpectedStorageLocation() == storage) return; - replicas.add(new ReplicaUnderConstruction(block, dn, rState)); + replicas.add(new ReplicaUnderConstruction(block, storage, rState)); } @Override // BlockInfo diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 77d75a4eb57..f26f0e953a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -500,9 +500,8 @@ public class BlockManager { Collection corruptNodes = corruptReplicas.getNodes(block); - for (Iterator jt = blocksMap.nodeIterator(block); - jt.hasNext();) { - DatanodeDescriptor node = jt.next(); + for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); String state = ""; if (corruptNodes != null && corruptNodes.contains(node)) { state = "(corrupt)"; @@ -662,10 +661,9 @@ public class BlockManager { assert oldBlock == getStoredBlock(oldBlock) : "last block of the file is not in blocksMap"; - DatanodeDescriptor[] targets = getNodes(oldBlock); + DatanodeStorageInfo[] targets = getStorages(oldBlock); - BlockInfoUnderConstruction ucBlock = - bc.setLastBlock(oldBlock, targets); + BlockInfoUnderConstruction ucBlock = bc.setLastBlock(oldBlock, targets); blocksMap.replaceBlock(ucBlock); // Remove block from replication queue. @@ -675,9 +673,8 @@ public class BlockManager { pendingReplications.remove(ucBlock); // remove this block from the list of pending blocks to be deleted. - for (DatanodeDescriptor dd : targets) { - String datanodeId = dd.getStorageID(); - invalidateBlocks.remove(datanodeId, oldBlock); + for (DatanodeStorageInfo storage : targets) { + invalidateBlocks.remove(storage.getStorageID(), oldBlock); } // Adjust safe-mode totals, since under-construction blocks don't @@ -699,9 +696,8 @@ public class BlockManager { private List getValidLocations(Block block) { ArrayList machineSet = new ArrayList(blocksMap.numNodes(block)); - for(Iterator it = - blocksMap.nodeIterator(block); it.hasNext();) { - String storageID = it.next().getStorageID(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + final String storageID = storage.getStorageID(); // filter invalidate replicas if(!invalidateBlocks.contains(storageID, block)) { machineSet.add(storageID); @@ -775,9 +771,9 @@ public class BlockManager { + ", blk=" + blk); } final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)blk; - final DatanodeDescriptor[] locations = uc.getExpectedLocations(); + final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return new LocatedBlock(eb, locations, pos, false); + return LocatedBlock.createLocatedBlock(eb, storages, pos, false); } // get block locations @@ -795,9 +791,8 @@ public class BlockManager { final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines]; int j = 0; if (numMachines > 0) { - for(Iterator it = blocksMap.nodeIterator(blk); - it.hasNext();) { - final DatanodeDescriptor d = it.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { + final DatanodeDescriptor d = storage.getDatanodeDescriptor(); final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d); if (isCorrupt || (!isCorrupt && !replicaCorrupt)) machines[j++] = d; @@ -1017,9 +1012,8 @@ public class BlockManager { */ private void addToInvalidates(Block b) { StringBuilder datanodes = new StringBuilder(); - for (Iterator it = blocksMap.nodeIterator(b); it - .hasNext();) { - DatanodeDescriptor node = it.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); invalidateBlocks.add(b, node, false); datanodes.append(node).append(" "); } @@ -1466,10 +1460,10 @@ public class BlockManager { int decommissioned = 0; int corrupt = 0; int excess = 0; - Iterator it = blocksMap.nodeIterator(block); + Collection nodesCorrupt = corruptReplicas.getNodes(block); - while(it.hasNext()) { - DatanodeDescriptor node = it.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node.getStorageID()); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) @@ -1790,7 +1784,7 @@ public class BlockManager { // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent( - node, iblk, reportedState); + node.getStorageInfo(storageID), iblk, reportedState); //and fall through to next clause } //add replica if appropriate @@ -2093,7 +2087,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block DatanodeDescriptor node, String storageID, ReplicaState reportedState) throws IOException { - block.addReplicaIfNotPresent(node, block, reportedState); + block.addReplicaIfNotPresent(node.getStorageInfo(storageID), block, reportedState); if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) { addStoredBlock(block, node, storageID, null, true); } @@ -2425,9 +2419,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block Collection nonExcess = new ArrayList(); Collection corruptNodes = corruptReplicas .getNodes(block); - for (Iterator it = blocksMap.nodeIterator(block); - it.hasNext();) { - DatanodeDescriptor cur = it.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (cur.areBlockContentsStale()) { LOG.info("BLOCK* processOverReplicatedBlock: " + "Postponing processing of over-replicated " + @@ -2747,10 +2740,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block int corrupt = 0; int excess = 0; int stale = 0; - Iterator nodeIter = blocksMap.nodeIterator(b); Collection nodesCorrupt = corruptReplicas.getNodes(b); - while (nodeIter.hasNext()) { - DatanodeDescriptor node = nodeIter.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { corrupt++; } else if (node.isDecommissionInProgress() || node.isDecommissioned()) { @@ -2787,10 +2779,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block } // else proceed with fast case int live = 0; - Iterator nodeIter = blocksMap.nodeIterator(b); Collection nodesCorrupt = corruptReplicas.getNodes(b); - while (nodeIter.hasNext()) { - DatanodeDescriptor node = nodeIter.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node))) live++; } @@ -2802,10 +2793,9 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block int curReplicas = num.liveReplicas(); int curExpectedReplicas = getReplication(block); BlockCollection bc = blocksMap.getBlockCollection(block); - Iterator nodeIter = blocksMap.nodeIterator(block); StringBuilder nodeList = new StringBuilder(); - while (nodeIter.hasNext()) { - DatanodeDescriptor node = nodeIter.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); nodeList.append(node); nodeList.append(" "); } @@ -2902,14 +2892,13 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block return blocksMap.size(); } - public DatanodeDescriptor[] getNodes(BlockInfo block) { - DatanodeDescriptor[] nodes = - new DatanodeDescriptor[block.numNodes()]; - Iterator it = blocksMap.nodeIterator(block); - for (int i = 0; it != null && it.hasNext(); i++) { - nodes[i] = it.next(); + public DatanodeStorageInfo[] getStorages(BlockInfo block) { + final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()]; + int i = 0; + for(DatanodeStorageInfo s : blocksMap.getStorages(block)) { + storages[i++] = s; } - return nodes; + return storages; } public int getTotalBlocks() { @@ -3038,9 +3027,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block corruptReplicas.getNodes(b); int numExpectedReplicas = getReplication(b); String rackName = null; - for (Iterator it = blocksMap.nodeIterator(b); - it.hasNext();) { - DatanodeDescriptor cur = it.next(); + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if ((corruptNodes == null ) || !corruptNodes.contains(cur)) { if (numExpectedReplicas == 1 || @@ -3084,8 +3072,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block } /** @return an iterator of the datanodes. */ - public Iterator datanodeIterator(final Block block) { - return blocksMap.nodeIterator(block); + public Iterable getStorages(final Block block) { + return blocksMap.getStorages(block); } public int numCorruptReplicas(Block block) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 1e454c9bc86..8e9f706f6fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -29,11 +29,11 @@ import org.apache.hadoop.util.LightWeightGSet; * the datanodes that store the block. */ class BlocksMap { - private static class NodeIterator implements Iterator { + private static class StorageIterator implements Iterator { private BlockInfo blockInfo; private int nextIdx = 0; - NodeIterator(BlockInfo blkInfo) { + StorageIterator(BlockInfo blkInfo) { this.blockInfo = blkInfo; } @@ -44,8 +44,8 @@ class BlocksMap { } @Override - public DatanodeDescriptor next() { - return blockInfo.getDatanode(nextIdx++); + public DatanodeStorageInfo next() { + return blockInfo.getStorageInfo(nextIdx++); } @Override @@ -115,18 +115,23 @@ class BlocksMap { /** * Searches for the block in the BlocksMap and - * returns Iterator that iterates through the nodes the block belongs to. + * returns {@link Iterable} that iterates through the nodes the block belongs to. */ - Iterator nodeIterator(Block b) { - return nodeIterator(blocks.get(b)); + Iterable getStorages(Block b) { + return getStorages(blocks.get(b)); } /** * For a block that has already been retrieved from the BlocksMap - * returns Iterator that iterates through the nodes the block belongs to. + * returns {@link Iterable} that iterates through the nodes the block belongs to. */ - Iterator nodeIterator(BlockInfo storedBlock) { - return new NodeIterator(storedBlock); + Iterable getStorages(final BlockInfo storedBlock) { + return new Iterable() { + @Override + public Iterator iterator() { + return new StorageIterator(storedBlock); + } + }; } /** counts number of containing nodes. Better than using iterator. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index e9f4fdb636b..62235a7bba1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -23,7 +23,13 @@ import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,10 +54,17 @@ import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet; import org.apache.hadoop.hdfs.server.namenode.HostFileManager.MutableEntrySet; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; -import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; +import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.util.CyclicIteration; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping; @@ -423,6 +436,20 @@ public class DatanodeManager { return node; } + public DatanodeStorageInfo[] getDatanodeStorageInfos( + DatanodeID[] datanodeID, String[] storageIDs) + throws UnregisteredNodeException { + if (datanodeID.length == 0) { + return null; + } + final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[datanodeID.length]; + for(int i = 0; i < datanodeID.length; i++) { + final DatanodeDescriptor dd = getDatanode(datanodeID[i]); + storages[i] = dd.getStorageInfo(storageIDs[i]); + } + return storages; + } + /** Prints information about all datanodes. */ void datanodeDump(final PrintWriter out) { synchronized (datanodeMap) { @@ -1151,32 +1178,32 @@ public class DatanodeManager { BlockRecoveryCommand brCommand = new BlockRecoveryCommand( blocks.length); for (BlockInfoUnderConstruction b : blocks) { - DatanodeDescriptor[] expectedLocations = b.getExpectedLocations(); + final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); // Skip stale nodes during recovery - not heart beated for some time (30s by default). - List recoveryLocations = - new ArrayList(expectedLocations.length); - for (int i = 0; i < expectedLocations.length; i++) { - if (!expectedLocations[i].isStale(this.staleInterval)) { - recoveryLocations.add(expectedLocations[i]); + final List recoveryLocations = + new ArrayList(storages.length); + for (int i = 0; i < storages.length; i++) { + if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) { + recoveryLocations.add(storages[i]); } } // If we only get 1 replica after eliminating stale nodes, then choose all // replicas for recovery and let the primary data node handle failures. if (recoveryLocations.size() > 1) { - if (recoveryLocations.size() != expectedLocations.length) { + if (recoveryLocations.size() != storages.length) { LOG.info("Skipped stale nodes for recovery : " + - (expectedLocations.length - recoveryLocations.size())); + (storages.length - recoveryLocations.size())); } brCommand.add(new RecoveringBlock( new ExtendedBlock(blockPoolId, b), - recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]), + DatanodeStorageInfo.toDatanodeInfos(recoveryLocations), b.getBlockRecoveryId())); } else { // If too many replicas are stale, then choose all replicas to participate // in block recovery. brCommand.add(new RecoveringBlock( new ExtendedBlock(blockPoolId, b), - expectedLocations, + DatanodeStorageInfo.toDatanodeInfos(storages), b.getBlockRecoveryId())); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 805e334acbf..53fcde1c691 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -17,9 +17,12 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.util.Arrays; import java.util.Iterator; +import java.util.List; import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -29,6 +32,17 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; * by this class. */ public class DatanodeStorageInfo { + static DatanodeInfo[] toDatanodeInfos(DatanodeStorageInfo[] storages) { + return toDatanodeInfos(Arrays.asList(storages)); + } + static DatanodeInfo[] toDatanodeInfos(List storages) { + final DatanodeInfo[] datanodes = new DatanodeInfo[storages.size()]; + for(int i = 0; i < storages.size(); i++) { + datanodes[i] = storages.get(i).getDatanodeDescriptor(); + } + return datanodes; + } + /** * Iterates over the list of blocks belonging to the data-node. */ @@ -65,7 +79,7 @@ public class DatanodeStorageInfo { private long remaining; private volatile BlockInfo blockList = null; - DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) { + public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) { this.dn = dn; this.storageID = s.getStorageID(); this.storageType = s.getStorageType(); @@ -92,6 +106,10 @@ public class DatanodeStorageInfo { return storageID; } + public StorageType getStorageType() { + return storageType; + } + public long getCapacity() { return capacity; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java index 41975d3371d..5d63fefaaf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java @@ -34,5 +34,5 @@ public interface MutableBlockCollection extends BlockCollection { * and set the locations. */ public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock, - DatanodeDescriptor[] locations) throws IOException; + DatanodeStorageInfo[] storages) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 51642a8b23a..e52560b6b73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount; @@ -345,7 +346,7 @@ public class FSDirectory implements Closeable { * Add a block to the file. Returns a reference to the added block. */ BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block, - DatanodeDescriptor targets[]) throws IOException { + DatanodeStorageInfo[] targets) throws IOException { waitForReady(); writeLock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d83d53a5f35..94ae9d8cc72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -168,6 +168,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.OutOfV1GenerationStampsException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; @@ -2484,8 +2485,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } // choose targets for the new block to be allocated. - final DatanodeDescriptor targets[] = getBlockManager().chooseTarget( + // TODO: chooseTarget(..) should be changed to return DatanodeStorageInfo's + final DatanodeDescriptor chosenDatanodes[] = getBlockManager().chooseTarget( src, replication, clientNode, excludedNodes, blockSize, favoredNodes); + final DatanodeStorageInfo[] targets = new DatanodeStorageInfo[chosenDatanodes.length]; + for(int i = 0; i < targets.length; i++) { + final DatanodeDescriptor dd = chosenDatanodes[i]; + targets[i] = dd.getStorageInfos().iterator().next(); + } // Part II. // Allocate a new block, add it to the INode and the BlocksMap. @@ -2607,7 +2614,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, src + ". Returning previously allocated block " + lastBlockInFile); long offset = pendingFile.computeFileSize(); onRetryBlock[0] = makeLocatedBlock(lastBlockInFile, - ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(), + ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedStorageLocations(), offset); return iip; } else { @@ -2625,11 +2632,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return iip; } - LocatedBlock makeLocatedBlock(Block blk, - DatanodeInfo[] locs, + LocatedBlock makeLocatedBlock(Block blk, DatanodeStorageInfo[] locs, long offset) throws IOException { - LocatedBlock lBlk = new LocatedBlock( - getExtendedBlock(blk), locs, offset); + LocatedBlock lBlk = LocatedBlock.createLocatedBlock( + getExtendedBlock(blk), locs, offset, false); getBlockManager().setBlockToken( lBlk, BlockTokenSecretManager.AccessMode.WRITE); return lBlk; @@ -2852,13 +2858,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws QuotaExceededException If addition of block exceeds space quota */ BlockInfo saveAllocatedBlock(String src, INodesInPath inodesInPath, - Block newBlock, DatanodeDescriptor targets[]) throws IOException { + Block newBlock, DatanodeStorageInfo[] targets) + throws IOException { assert hasWriteLock(); BlockInfo b = dir.addBlock(src, inodesInPath, newBlock, targets); NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". " + getBlockPoolId() + " " + b); - for (DatanodeDescriptor dn : targets) { - dn.incBlocksScheduled(); + for (DatanodeStorageInfo storage : targets) { + storage.getDatanodeDescriptor().incBlocksScheduled(); } return b; } @@ -3622,7 +3629,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)lastBlock; // setup the last block locations from the blockManager if not known if (uc.getNumExpectedLocations() == 0) { - uc.setExpectedLocations(blockManager.getNodes(lastBlock)); + uc.setExpectedLocations(blockManager.getStorages(lastBlock)); } // start recovery of the last block for this file long blockRecoveryId = nextGenerationStamp(isLegacyBlock(uc)); @@ -3777,24 +3784,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats, // find the DatanodeDescriptor objects // There should be no locations in the blockManager till now because the // file is underConstruction - DatanodeDescriptor[] descriptors = null; - if (newtargets.length > 0) { - descriptors = new DatanodeDescriptor[newtargets.length]; - for(int i = 0; i < newtargets.length; i++) { - descriptors[i] = blockManager.getDatanodeManager().getDatanode( - newtargets[i]); - } - } - if ((closeFile) && (descriptors != null)) { + final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager() + .getDatanodeStorageInfos(newtargets, newtargetstorages); + if (closeFile && storages != null) { // the file is getting closed. Insert block locations into blockManager. // Otherwise fsck will report these blocks as MISSING, especially if the // blocksReceived from Datanodes take a long time to arrive. - for (int i = 0; i < descriptors.length; i++) { - descriptors[i].addBlock(newtargetstorages[i], storedBlock); + for (int i = 0; i < storages.length; i++) { + storages[i].addBlock(storedBlock); } } // add pipeline locations into the INodeUnderConstruction - pendingFile.setLastBlock(storedBlock, descriptors); + pendingFile.setLastBlock(storedBlock, storages); } if (closeFile) { @@ -5639,7 +5640,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws IOException if any error occurs */ void updatePipeline(String clientName, ExtendedBlock oldBlock, - ExtendedBlock newBlock, DatanodeID[] newNodes) + ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs) throws IOException { CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); if (cacheEntry != null && cacheEntry.isSuccess()) { @@ -5662,7 +5663,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and " + oldBlock + " has different block identifier"; updatePipelineInternal(clientName, oldBlock, newBlock, newNodes, - cacheEntry != null); + newStorageIDs, cacheEntry != null); success = true; } finally { writeUnlock(); @@ -5674,7 +5675,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, /** @see #updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock, - ExtendedBlock newBlock, DatanodeID[] newNodes, boolean logRetryCache) + ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs, + boolean logRetryCache) throws IOException { assert hasWriteLock(); // check the vadility of the block and lease holder name @@ -5698,15 +5700,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, blockinfo.setNumBytes(newBlock.getNumBytes()); // find the DatanodeDescriptor objects - final DatanodeManager dm = getBlockManager().getDatanodeManager(); - DatanodeDescriptor[] descriptors = null; - if (newNodes.length > 0) { - descriptors = new DatanodeDescriptor[newNodes.length]; - for(int i = 0; i < newNodes.length; i++) { - descriptors[i] = dm.getDatanode(newNodes[i]); - } - } - blockinfo.setExpectedLocations(descriptors); + final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager() + .getDatanodeStorageInfos(newNodes, newStorageIDs); + blockinfo.setExpectedLocations(storages); String src = leaseManager.findPath(pendingFile); dir.persistBlocks(src, pendingFile, logRetryCache); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java index 9dd38ff320b..2f9fbb0af7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot; @@ -180,7 +181,7 @@ public class INodeFileUnderConstruction extends INodeFile implements MutableBloc */ @Override public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock, - DatanodeDescriptor[] targets) throws IOException { + DatanodeStorageInfo[] targets) throws IOException { if (numBlocks() == 0) { throw new IOException("Failed to set last block: File is empty."); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 62a17a96f5b..defe743898f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -611,9 +611,9 @@ class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void updatePipeline(String clientName, ExtendedBlock oldBlock, - ExtendedBlock newBlock, DatanodeID[] newNodes) + ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs) throws IOException { - namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes); + namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes, newStorageIDs); } @Override // DatanodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java index 1989784ba53..aabc073c899 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; @@ -1065,11 +1066,10 @@ class NamenodeJspHelper { } doc.startTag("replicas"); - for(final Iterator it = blockManager.datanodeIterator(block); - it.hasNext(); ) { + for(DatanodeStorageInfo storage : blockManager.getStorages(block)) { doc.startTag("replica"); - DatanodeDescriptor dd = it.next(); + DatanodeDescriptor dd = storage.getDatanodeDescriptor(); doc.startTag("host_name"); doc.pcdata(dd.getHostName()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index 95fcc50ebd3..3f09aeeb3c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -437,6 +437,7 @@ message UpdatePipelineRequestProto { required ExtendedBlockProto oldBlock = 2; required ExtendedBlockProto newBlock = 3; repeated DatanodeIDProto newNodes = 4; + repeated string storageIDs = 5; } message UpdatePipelineResponseProto { // void response diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto index 9709e888480..da0b8322aa6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto @@ -134,6 +134,7 @@ message LocatedBlockProto { required hadoop.common.TokenProto blockToken = 5; repeated StorageTypeProto storageTypes = 6; + repeated string storageIDs = 7; } message DataEncryptionKeyProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index f0c10b0a2fe..696a320e23c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -61,8 +61,8 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.Options.Rename; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -79,6 +79,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -86,6 +87,7 @@ import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; @@ -856,6 +858,13 @@ public class DFSTestUtil { rackLocation); } + public static DatanodeStorageInfo createDatanodeStorageInfo( + String storageID, String ip) { + return new DatanodeStorageInfo( + getDatanodeDescriptor(ip, "defaultRack"), + new DatanodeStorage(storageID)); + } + public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, int port, String rackLocation) { DatanodeID dnId = new DatanodeID(ipAddr, "host", "", port, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 006ee42bb58..2f659417479 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -85,9 +85,8 @@ public class BlockManagerTestUtil { final Set rackSet = new HashSet(0); final Collection corruptNodes = getCorruptReplicas(blockManager).getNodes(b); - for (Iterator it = blockManager.blocksMap.nodeIterator(b); - it.hasNext();) { - DatanodeDescriptor cur = it.next(); + for(DatanodeStorageInfo storage : blockManager.blocksMap.getStorages(b)) { + final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if ((corruptNodes == null ) || !corruptNodes.contains(cur)) { String rackName = cur.getNetworkLocation(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java index cafc8227147..703d344347a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java @@ -31,18 +31,19 @@ import org.junit.Test; public class TestBlockInfoUnderConstruction { @Test public void testInitializeBlockRecovery() throws Exception { - DatanodeDescriptor dd1 = DFSTestUtil.getDatanodeDescriptor("10.10.1.1", - "default"); - DatanodeDescriptor dd2 = DFSTestUtil.getDatanodeDescriptor("10.10.1.2", - "default"); - DatanodeDescriptor dd3 = DFSTestUtil.getDatanodeDescriptor("10.10.1.3", - "default"); + DatanodeStorageInfo s1 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.1", "s1"); + DatanodeDescriptor dd1 = s1.getDatanodeDescriptor(); + DatanodeStorageInfo s2 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.2", "s2"); + DatanodeDescriptor dd2 = s2.getDatanodeDescriptor(); + DatanodeStorageInfo s3 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.3", "s3"); + DatanodeDescriptor dd3 = s3.getDatanodeDescriptor(); + dd1.isAlive = dd2.isAlive = dd3.isAlive = true; BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction( new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3, BlockUCState.UNDER_CONSTRUCTION, - new DatanodeDescriptor[] {dd1, dd2, dd3}); + new DatanodeStorageInfo[] {s1, s2, s3}); // Recovery attempt #1. long currentTime = System.currentTimeMillis(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java index bbb83070a30..fc5ce60b4d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import java.util.ArrayList; -import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -162,10 +161,13 @@ public class TestHeartbeatHandling { dd1.setLastUpdate(System.currentTimeMillis()); dd2.setLastUpdate(System.currentTimeMillis()); dd3.setLastUpdate(System.currentTimeMillis()); + final DatanodeStorageInfo[] storages = { + dd1.getStorageInfos().iterator().next(), + dd2.getStorageInfos().iterator().next(), + dd3.getStorageInfos().iterator().next()}; BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction( new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3, - BlockUCState.UNDER_RECOVERY, - new DatanodeDescriptor[] {dd1, dd2, dd3}); + BlockUCState.UNDER_RECOVERY, storages); dd1.addBlockToBeRecovered(blockInfo); DatanodeCommand[] cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands(); @@ -187,8 +189,7 @@ public class TestHeartbeatHandling { dd3.setLastUpdate(System.currentTimeMillis()); blockInfo = new BlockInfoUnderConstruction( new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3, - BlockUCState.UNDER_RECOVERY, - new DatanodeDescriptor[] {dd1, dd2, dd3}); + BlockUCState.UNDER_RECOVERY, storages); dd1.addBlockToBeRecovered(blockInfo); cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands(); assertEquals(1, cmds.length); @@ -209,8 +210,7 @@ public class TestHeartbeatHandling { dd3.setLastUpdate(System.currentTimeMillis() - 80 * 1000); blockInfo = new BlockInfoUnderConstruction( new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3, - BlockUCState.UNDER_RECOVERY, - new DatanodeDescriptor[] {dd1, dd2, dd3}); + BlockUCState.UNDER_RECOVERY, storages); dd1.addBlockToBeRecovered(blockInfo); cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands(); assertEquals(1, cmds.length); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java index 86994bed3ac..d66c9254e5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java @@ -98,11 +98,9 @@ public class TestNodeCount { } // find out a non-excess node - final Iterator iter = bm.blocksMap - .nodeIterator(block.getLocalBlock()); DatanodeDescriptor nonExcessDN = null; - while (iter.hasNext()) { - DatanodeDescriptor dn = iter.next(); + for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) { + final DatanodeDescriptor dn = storage.getDatanodeDescriptor(); Collection blocks = bm.excessReplicateMap.get(dn.getStorageID()); if (blocks == null || !blocks.contains(block.getLocalBlock()) ) { nonExcessDN = dn; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 547e8536f2b..ee86e9dac44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSClusterStats; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.Time; @@ -71,6 +72,7 @@ public class TestReplicationPolicy { private static BlockPlacementPolicy replicator; private static final String filename = "/dummyfile.txt"; private static DatanodeDescriptor dataNodes[]; + private static String[] storageIDs; // The interval for marking a datanode as stale, private static long staleInterval = DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT; @@ -1143,11 +1145,12 @@ public class TestReplicationPolicy { info.setBlockCollection(mbc); bm.addBlockCollection(info, mbc); - DatanodeDescriptor[] dnAry = {dataNodes[0]}; + DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo( + dataNodes[0], new DatanodeStorage("s1"))}; final BlockInfoUnderConstruction ucBlock = info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, - dnAry); - when(mbc.setLastBlock((BlockInfo) any(), (DatanodeDescriptor[]) any())) + storageAry); + when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any())) .thenReturn(ucBlock); bm.convertLastBlockToUnderConstruction(mbc); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java index 7decc4b8978..210f0e90421 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java @@ -48,7 +48,8 @@ public class TestUnderReplicatedBlocks { // but the block does not get put into the under-replicated blocks queue final BlockManager bm = cluster.getNamesystem().getBlockManager(); ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH); - DatanodeDescriptor dn = bm.blocksMap.nodeIterator(b.getLocalBlock()).next(); + DatanodeDescriptor dn = bm.blocksMap.getStorages(b.getLocalBlock()) + .iterator().next().getDatanodeDescriptor(); bm.addToInvalidates(b.getLocalBlock(), dn); // Compute the invalidate work in NN, and trigger the heartbeat from DN BlockManagerTestUtil.computeAllPendingWork(bm); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java index 53007196200..23bd4994a37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.junit.Test; @@ -50,7 +50,7 @@ public class TestCommitBlockSynchronization { throws IOException { Configuration conf = new Configuration(); FSImage image = new FSImage(conf); - DatanodeDescriptor[] targets = new DatanodeDescriptor[0]; + final DatanodeStorageInfo[] targets = {}; FSNamesystem namesystem = new FSNamesystem(conf, image); FSNamesystem namesystemSpy = spy(namesystem); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java index 3c8b91be8d2..7c87ab0d664 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -523,16 +524,17 @@ public class TestPipelinesFailover { (BlockInfoUnderConstruction)storedBlock; // We expect that the replica with the most recent heart beat will be // the one to be in charge of the synchronization / recovery protocol. - DatanodeDescriptor[] datanodes = ucBlock.getExpectedLocations(); - DatanodeDescriptor expectedPrimary = datanodes[0]; - long mostRecentLastUpdate = expectedPrimary.getLastUpdate(); - for (int i = 1; i < datanodes.length; i++) { - if (datanodes[i].getLastUpdate() > mostRecentLastUpdate) { - expectedPrimary = datanodes[i]; - mostRecentLastUpdate = expectedPrimary.getLastUpdate(); + final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations(); + DatanodeStorageInfo expectedPrimary = storages[0]; + long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor().getLastUpdate(); + for (int i = 1; i < storages.length; i++) { + final long lastUpdate = storages[i].getDatanodeDescriptor().getLastUpdate(); + if (lastUpdate > mostRecentLastUpdate) { + expectedPrimary = storages[i]; + mostRecentLastUpdate = lastUpdate; } } - return expectedPrimary; + return expectedPrimary.getDatanodeDescriptor(); } private DistributedFileSystem createFsAsOtherUser( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java index dff44a0c89c..e907f7669af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java @@ -702,9 +702,10 @@ public class TestRetryCacheWithHA { DatanodeInfo[] newNodes = new DatanodeInfo[2]; newNodes[0] = nodes[0]; newNodes[1] = nodes[1]; + String[] storageIDs = {"s0", "s1"}; client.getNamenode().updatePipeline(client.getClientName(), oldBlock, - newBlock, newNodes); + newBlock, newNodes, storageIDs); out.close(); } @@ -714,10 +715,10 @@ public class TestRetryCacheWithHA { .getNamesystem(0).getFSDirectory().getINode4Write(file).asFile(); BlockInfoUnderConstruction blkUC = (BlockInfoUnderConstruction) (fileNode.getBlocks())[1]; - int datanodeNum = blkUC.getExpectedLocations().length; + int datanodeNum = blkUC.getExpectedStorageLocations().length; for (int i = 0; i < CHECKTIMES && datanodeNum != 2; i++) { Thread.sleep(1000); - datanodeNum = blkUC.getExpectedLocations().length; + datanodeNum = blkUC.getExpectedStorageLocations().length; } return datanodeNum == 2; }