HDFS-6794: Merging r1615169 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1615175 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-08-01 17:35:14 +00:00
parent 91794728d5
commit 1b4f028385
10 changed files with 128 additions and 104 deletions

View File

@ -91,6 +91,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6796. Improve the argument check during balancer command line parsing. HDFS-6796. Improve the argument check during balancer command line parsing.
(Benoy Antony via szetszwo) (Benoy Antony via szetszwo)
HDFS-6794. Update BlockManager methods to use DatanodeStorageInfo
where possible (Arpit Agarwal)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -1079,6 +1079,7 @@ public class BlockManager {
* Mark the block belonging to datanode as corrupt * Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt * @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica * @param dn Datanode which holds the corrupt replica
* @param storageID if known, null otherwise.
* @param reason a textual reason why the block should be marked corrupt, * @param reason a textual reason why the block should be marked corrupt,
* for logging purposes * for logging purposes
*/ */
@ -1095,19 +1096,29 @@ public class BlockManager {
+ blk + " not found"); + blk + " not found");
return; return;
} }
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
dn, storageID);
}
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
DatanodeInfo dn, String storageID) throws IOException {
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) { if (node == null) {
throw new IOException("Cannot mark " + b throw new IOException("Cannot mark " + blk
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
+ ") does not exist"); + ") does not exist");
} }
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
storageID == null ? null : node.getStorageInfo(storageID),
node);
}
/**
*
* @param b
* @param storageInfo storage that contains the block, if known. null otherwise.
* @throws IOException
*/
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
DatanodeStorageInfo storageInfo,
DatanodeDescriptor node) throws IOException {
BlockCollection bc = b.corrupted.getBlockCollection(); BlockCollection bc = b.corrupted.getBlockCollection();
if (bc == null) { if (bc == null) {
@ -1118,7 +1129,9 @@ public class BlockManager {
} }
// Add replica to the data-node if it is not already there // Add replica to the data-node if it is not already there
node.addBlock(storageID, b.stored); if (storageInfo != null) {
storageInfo.addBlock(b.stored);
}
// Add this replica to corruptReplicas Map // Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
@ -1460,7 +1473,7 @@ public class BlockManager {
* @throws IOException * @throws IOException
* if the number of targets < minimum replication. * if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, Node, * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
* List, boolean, Set, long) * List, boolean, Set, long, StorageType)
*/ */
public DatanodeStorageInfo[] chooseTarget(final String src, public DatanodeStorageInfo[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client, final int numOfReplicas, final DatanodeDescriptor client,
@ -1697,7 +1710,7 @@ public class BlockManager {
* @throws IOException * @throws IOException
*/ */
public boolean processReport(final DatanodeID nodeID, public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage, final String poolId, final DatanodeStorage storage,
final BlockListAsLongs newReport) throws IOException { final BlockListAsLongs newReport) throws IOException {
namesystem.writeLock(); namesystem.writeLock();
final long startTime = Time.now(); //after acquiring write lock final long startTime = Time.now(); //after acquiring write lock
@ -1729,9 +1742,9 @@ public class BlockManager {
if (storageInfo.numBlocks() == 0) { if (storageInfo.numBlocks() == 0) {
// The first block report can be processed a lot more efficiently than // The first block report can be processed a lot more efficiently than
// ordinary block reports. This shortens restart times. // ordinary block reports. This shortens restart times.
processFirstBlockReport(node, storage.getStorageID(), newReport); processFirstBlockReport(storageInfo, newReport);
} else { } else {
processReport(node, storage, newReport); processReport(storageInfo, newReport);
} }
// Now that we have an up-to-date block report, we know that any // Now that we have an up-to-date block report, we know that any
@ -1793,9 +1806,8 @@ public class BlockManager {
} }
} }
private void processReport(final DatanodeDescriptor node, private void processReport(final DatanodeStorageInfo storageInfo,
final DatanodeStorage storage, final BlockListAsLongs report) throws IOException {
final BlockListAsLongs report) throws IOException {
// Normal case: // Normal case:
// Modify the (block-->datanode) map, according to the difference // Modify the (block-->datanode) map, according to the difference
// between the old and new block report. // between the old and new block report.
@ -1805,19 +1817,20 @@ public class BlockManager {
Collection<Block> toInvalidate = new LinkedList<Block>(); Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>(); Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
reportDiff(node, storage, report, reportDiff(storageInfo, report,
toAdd, toRemove, toInvalidate, toCorrupt, toUC); toAdd, toRemove, toInvalidate, toCorrupt, toUC);
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue // Process the blocks on each queue
for (StatefulBlockInfo b : toUC) { for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b, node, storage.getStorageID()); addStoredBlockUnderConstruction(b, storageInfo);
} }
for (Block b : toRemove) { for (Block b : toRemove) {
removeStoredBlock(b, node); removeStoredBlock(b, node);
} }
int numBlocksLogged = 0; int numBlocksLogged = 0;
for (BlockInfo b : toAdd) { for (BlockInfo b : toAdd) {
addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog); addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++; numBlocksLogged++;
} }
if (numBlocksLogged > maxNumBlocksToLog) { if (numBlocksLogged > maxNumBlocksToLog) {
@ -1831,7 +1844,7 @@ public class BlockManager {
addToInvalidates(b, node); addToInvalidates(b, node);
} }
for (BlockToMarkCorrupt b : toCorrupt) { for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b, node, storage.getStorageID()); markBlockAsCorrupt(b, storageInfo, node);
} }
} }
@ -1842,16 +1855,16 @@ public class BlockManager {
* a toRemove list (since there won't be any). It also silently discards * a toRemove list (since there won't be any). It also silently discards
* any invalid blocks, thereby deferring their processing until * any invalid blocks, thereby deferring their processing until
* the next block report. * the next block report.
* @param node - DatanodeDescriptor of the node that sent the report * @param storageInfo - DatanodeStorageInfo that sent the report
* @param report - the initial block report, to be processed * @param report - the initial block report, to be processed
* @throws IOException * @throws IOException
*/ */
private void processFirstBlockReport(final DatanodeDescriptor node, private void processFirstBlockReport(
final String storageID, final DatanodeStorageInfo storageInfo,
final BlockListAsLongs report) throws IOException { final BlockListAsLongs report) throws IOException {
if (report == null) return; if (report == null) return;
assert (namesystem.hasWriteLock()); assert (namesystem.hasWriteLock());
assert (node.getStorageInfo(storageID).numBlocks() == 0); assert (storageInfo.numBlocks() == 0);
BlockReportIterator itBR = report.getBlockReportIterator(); BlockReportIterator itBR = report.getBlockReportIterator();
while(itBR.hasNext()) { while(itBR.hasNext()) {
@ -1860,7 +1873,7 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture && if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(iblk)) { namesystem.isGenStampInFuture(iblk)) {
queueReportedBlock(node, storageID, iblk, reportedState, queueReportedBlock(storageInfo, iblk, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP); QUEUE_REASON_FUTURE_GENSTAMP);
continue; continue;
} }
@ -1872,15 +1885,16 @@ public class BlockManager {
// If block is corrupt, mark it and continue to next block. // If block is corrupt, mark it and continue to next block.
BlockUCState ucState = storedBlock.getBlockUCState(); BlockUCState ucState = storedBlock.getBlockUCState();
BlockToMarkCorrupt c = checkReplicaCorrupt( BlockToMarkCorrupt c = checkReplicaCorrupt(
iblk, reportedState, storedBlock, ucState, node); iblk, reportedState, storedBlock, ucState,
storageInfo.getDatanodeDescriptor());
if (c != null) { if (c != null) {
if (shouldPostponeBlocksFromFuture) { if (shouldPostponeBlocksFromFuture) {
// In the Standby, we may receive a block report for a file that we // In the Standby, we may receive a block report for a file that we
// just have an out-of-date gen-stamp or state for, for example. // just have an out-of-date gen-stamp or state for, for example.
queueReportedBlock(node, storageID, iblk, reportedState, queueReportedBlock(storageInfo, iblk, reportedState,
QUEUE_REASON_CORRUPT_STATE); QUEUE_REASON_CORRUPT_STATE);
} else { } else {
markBlockAsCorrupt(c, node, storageID); markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor());
} }
continue; continue;
} }
@ -1888,7 +1902,7 @@ public class BlockManager {
// If block is under construction, add this replica to its list // If block is under construction, add this replica to its list
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent( ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
node.getStorageInfo(storageID), iblk, reportedState); storageInfo, iblk, reportedState);
// OpenFileBlocks only inside snapshots also will be added to safemode // OpenFileBlocks only inside snapshots also will be added to safemode
// threshold. So we need to update such blocks to safemode // threshold. So we need to update such blocks to safemode
// refer HDFS-5283 // refer HDFS-5283
@ -1901,12 +1915,12 @@ public class BlockManager {
} }
//add replica if appropriate //add replica if appropriate
if (reportedState == ReplicaState.FINALIZED) { if (reportedState == ReplicaState.FINALIZED) {
addStoredBlockImmediate(storedBlock, node, storageID); addStoredBlockImmediate(storedBlock, storageInfo);
} }
} }
} }
private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport, BlockListAsLongs newReport,
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor Collection<Block> toRemove, // remove from DatanodeDescriptor
@ -1914,8 +1928,6 @@ public class BlockManager {
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list Collection<StatefulBlockInfo> toUC) { // add to under-construction list
final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
// place a delimiter in the list which separates blocks // place a delimiter in the list which separates blocks
// that have been reported from those that have not // that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1); BlockInfo delimiter = new BlockInfo(new Block(), 1);
@ -1932,7 +1944,7 @@ public class BlockManager {
while(itBR.hasNext()) { while(itBR.hasNext()) {
Block iblk = itBR.next(); Block iblk = itBR.next();
ReplicaState iState = itBR.getCurrentReplicaState(); ReplicaState iState = itBR.getCurrentReplicaState();
BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(), BlockInfo storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
// move block to the head of the list // move block to the head of the list
@ -1969,7 +1981,7 @@ public class BlockManager {
* BlockInfoUnderConstruction's list of replicas.</li> * BlockInfoUnderConstruction's list of replicas.</li>
* </ol> * </ol>
* *
* @param dn descriptor for the datanode that made the report * @param storageInfo DatanodeStorageInfo that sent the report.
* @param block reported block replica * @param block reported block replica
* @param reportedState reported replica state * @param reportedState reported replica state
* @param toAdd add to DatanodeDescriptor * @param toAdd add to DatanodeDescriptor
@ -1981,14 +1993,16 @@ public class BlockManager {
* @return the up-to-date stored block, if it should be kept. * @return the up-to-date stored block, if it should be kept.
* Otherwise, null. * Otherwise, null.
*/ */
private BlockInfo processReportedBlock(final DatanodeDescriptor dn, private BlockInfo processReportedBlock(
final String storageID, final DatanodeStorageInfo storageInfo,
final Block block, final ReplicaState reportedState, final Block block, final ReplicaState reportedState,
final Collection<BlockInfo> toAdd, final Collection<BlockInfo> toAdd,
final Collection<Block> toInvalidate, final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt, final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) { final Collection<StatefulBlockInfo> toUC) {
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Reported block " + block LOG.debug("Reported block " + block
+ " on " + dn + " size " + block.getNumBytes() + " on " + dn + " size " + block.getNumBytes()
@ -1997,7 +2011,7 @@ public class BlockManager {
if (shouldPostponeBlocksFromFuture && if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(block)) { namesystem.isGenStampInFuture(block)) {
queueReportedBlock(dn, storageID, block, reportedState, queueReportedBlock(storageInfo, block, reportedState,
QUEUE_REASON_FUTURE_GENSTAMP); QUEUE_REASON_FUTURE_GENSTAMP);
return null; return null;
} }
@ -2037,7 +2051,7 @@ public class BlockManager {
// TODO: Pretty confident this should be s/storedBlock/block below, // TODO: Pretty confident this should be s/storedBlock/block below,
// since we should be postponing the info of the reported block, not // since we should be postponing the info of the reported block, not
// the stored block. See HDFS-6289 for more context. // the stored block. See HDFS-6289 for more context.
queueReportedBlock(dn, storageID, storedBlock, reportedState, queueReportedBlock(storageInfo, storedBlock, reportedState,
QUEUE_REASON_CORRUPT_STATE); QUEUE_REASON_CORRUPT_STATE);
} else { } else {
toCorrupt.add(c); toCorrupt.add(c);
@ -2066,17 +2080,17 @@ public class BlockManager {
* standby node. @see PendingDataNodeMessages. * standby node. @see PendingDataNodeMessages.
* @param reason a textual reason to report in the debug logs * @param reason a textual reason to report in the debug logs
*/ */
private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, String reason) { ReplicaState reportedState, String reason) {
assert shouldPostponeBlocksFromFuture; assert shouldPostponeBlocksFromFuture;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Queueing reported block " + block + LOG.debug("Queueing reported block " + block +
" in state " + reportedState + " in state " + reportedState +
" from datanode " + dn + " for later processing " + " from datanode " + storageInfo.getDatanodeDescriptor() +
"because " + reason + "."); " for later processing because " + reason + ".");
} }
pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState); pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
} }
/** /**
@ -2099,7 +2113,7 @@ public class BlockManager {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Processing previouly queued message " + rbi); LOG.debug("Processing previouly queued message " + rbi);
} }
processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), processAndHandleReportedBlock(rbi.getStorageInfo(),
rbi.getBlock(), rbi.getReportedState(), null); rbi.getBlock(), rbi.getReportedState(), null);
} }
} }
@ -2219,19 +2233,20 @@ public class BlockManager {
} }
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
DatanodeDescriptor node, String storageID) throws IOException { DatanodeStorageInfo storageInfo) throws IOException {
BlockInfoUnderConstruction block = ucBlock.storedBlock; BlockInfoUnderConstruction block = ucBlock.storedBlock;
block.addReplicaIfNotPresent(node.getStorageInfo(storageID), block.addReplicaIfNotPresent(
ucBlock.reportedBlock, ucBlock.reportedState); storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) { if (ucBlock.reportedState == ReplicaState.FINALIZED &&
addStoredBlock(block, node, storageID, null, true); block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) {
addStoredBlock(block, storageInfo, null, true);
} }
} }
/** /**
* Faster version of * Faster version of
* {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)} * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)}
* , intended for use with initial block report at startup. If not in startup * , intended for use with initial block report at startup. If not in startup
* safe mode, will call standard addStoredBlock(). Assumes this method is * safe mode, will call standard addStoredBlock(). Assumes this method is
* called "immediately" so there is no need to refresh the storedBlock from * called "immediately" so there is no need to refresh the storedBlock from
@ -2242,17 +2257,17 @@ public class BlockManager {
* @throws IOException * @throws IOException
*/ */
private void addStoredBlockImmediate(BlockInfo storedBlock, private void addStoredBlockImmediate(BlockInfo storedBlock,
DatanodeDescriptor node, String storageID) DatanodeStorageInfo storageInfo)
throws IOException { throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock()); assert (storedBlock != null && namesystem.hasWriteLock());
if (!namesystem.isInStartupSafeMode() if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) { || namesystem.isPopulatingReplQueues()) {
addStoredBlock(storedBlock, node, storageID, null, false); addStoredBlock(storedBlock, storageInfo, null, false);
return; return;
} }
// just add it // just add it
node.addBlock(storageID, storedBlock); storageInfo.addBlock(storedBlock);
// Now check for completion of blocks and safe block count // Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock); int numCurrentReplica = countLiveNodes(storedBlock);
@ -2274,13 +2289,13 @@ public class BlockManager {
* @return the block that is stored in blockMap. * @return the block that is stored in blockMap.
*/ */
private Block addStoredBlock(final BlockInfo block, private Block addStoredBlock(final BlockInfo block,
DatanodeDescriptor node, DatanodeStorageInfo storageInfo,
String storageID,
DatanodeDescriptor delNodeHint, DatanodeDescriptor delNodeHint,
boolean logEveryBlock) boolean logEveryBlock)
throws IOException { throws IOException {
assert block != null && namesystem.hasWriteLock(); assert block != null && namesystem.hasWriteLock();
BlockInfo storedBlock; BlockInfo storedBlock;
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
if (block instanceof BlockInfoUnderConstruction) { if (block instanceof BlockInfoUnderConstruction) {
//refresh our copy in case the block got completed in another thread //refresh our copy in case the block got completed in another thread
storedBlock = blocksMap.getStoredBlock(block); storedBlock = blocksMap.getStoredBlock(block);
@ -2300,7 +2315,7 @@ public class BlockManager {
assert bc != null : "Block must belong to a file"; assert bc != null : "Block must belong to a file";
// add block to the datanode // add block to the datanode
boolean added = node.addBlock(storageID, storedBlock); boolean added = storageInfo.addBlock(storedBlock);
int curReplicaDelta; int curReplicaDelta;
if (added) { if (added) {
@ -2846,8 +2861,9 @@ public class BlockManager {
* The given node is reporting that it received a certain block. * The given node is reporting that it received a certain block.
*/ */
@VisibleForTesting @VisibleForTesting
void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint) void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
throws IOException { throws IOException {
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Decrement number of blocks scheduled to this datanode. // Decrement number of blocks scheduled to this datanode.
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
// RECEIVED_BLOCK), we currently also decrease the approximate number. // RECEIVED_BLOCK), we currently also decrease the approximate number.
@ -2867,12 +2883,12 @@ public class BlockManager {
// Modify the blocks->datanode map and node's map. // Modify the blocks->datanode map and node's map.
// //
pendingReplications.decrement(block, node); pendingReplications.decrement(block, node);
processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED, processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode); delHintNode);
} }
private void processAndHandleReportedBlock(DatanodeDescriptor node, private void processAndHandleReportedBlock(
String storageID, Block block, DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode) ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException { throws IOException {
// blockReceived reports a finalized block // blockReceived reports a finalized block
@ -2880,7 +2896,9 @@ public class BlockManager {
Collection<Block> toInvalidate = new LinkedList<Block>(); Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>(); Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
processReportedBlock(node, storageID, block, reportedState, final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
processReportedBlock(storageInfo, block, reportedState,
toAdd, toInvalidate, toCorrupt, toUC); toAdd, toInvalidate, toCorrupt, toUC);
// the block is only in one of the to-do lists // the block is only in one of the to-do lists
// if it is in none then data-node already has it // if it is in none then data-node already has it
@ -2888,11 +2906,11 @@ public class BlockManager {
: "The block should be only in one of the lists."; : "The block should be only in one of the lists.";
for (StatefulBlockInfo b : toUC) { for (StatefulBlockInfo b : toUC) {
addStoredBlockUnderConstruction(b, node, storageID); addStoredBlockUnderConstruction(b, storageInfo);
} }
long numBlocksLogged = 0; long numBlocksLogged = 0;
for (BlockInfo b : toAdd) { for (BlockInfo b : toAdd) {
addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog); addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++; numBlocksLogged++;
} }
if (numBlocksLogged > maxNumBlocksToLog) { if (numBlocksLogged > maxNumBlocksToLog) {
@ -2906,7 +2924,7 @@ public class BlockManager {
addToInvalidates(b, node); addToInvalidates(b, node);
} }
for (BlockToMarkCorrupt b : toCorrupt) { for (BlockToMarkCorrupt b : toCorrupt) {
markBlockAsCorrupt(b, node, storageID); markBlockAsCorrupt(b, storageInfo, node);
} }
} }
@ -2933,13 +2951,15 @@ public class BlockManager {
"Got incremental block report from unregistered or dead node"); "Got incremental block report from unregistered or dead node");
} }
if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) { DatanodeStorageInfo storageInfo =
node.getStorageInfo(srdb.getStorage().getStorageID());
if (storageInfo == null) {
// The DataNode is reporting an unknown storage. Usually the NN learns // The DataNode is reporting an unknown storage. Usually the NN learns
// about new storages from heartbeats but during NN restart we may // about new storages from heartbeats but during NN restart we may
// receive a block report or incremental report before the heartbeat. // receive a block report or incremental report before the heartbeat.
// We must handle this for protocol compatibility. This issue was // We must handle this for protocol compatibility. This issue was
// uncovered by HDFS-6094. // uncovered by HDFS-6094.
node.updateStorage(srdb.getStorage()); storageInfo = node.updateStorage(srdb.getStorage());
} }
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
@ -2949,14 +2969,13 @@ public class BlockManager {
deleted++; deleted++;
break; break;
case RECEIVED_BLOCK: case RECEIVED_BLOCK:
addBlock(node, srdb.getStorage().getStorageID(), addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
rdbi.getBlock(), rdbi.getDelHints());
received++; received++;
break; break;
case RECEIVING_BLOCK: case RECEIVING_BLOCK:
receiving++; receiving++;
processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(), processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
rdbi.getBlock(), ReplicaState.RBW, null); ReplicaState.RBW, null);
break; break;
default: default:
String msg = String msg =

View File

@ -207,7 +207,7 @@ public class DatanodeStorageInfo {
return blockPoolUsed; return blockPoolUsed;
} }
boolean addBlock(BlockInfo b) { public boolean addBlock(BlockInfo b) {
if(!b.addStorage(this)) if(!b.addStorage(this))
return false; return false;
// add to the head of the data-node list // add to the head of the data-node list

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
/** /**
* In the Standby Node, we can receive messages about blocks * In the Standby Node, we can receive messages about blocks
@ -41,14 +42,12 @@ class PendingDataNodeMessages {
static class ReportedBlockInfo { static class ReportedBlockInfo {
private final Block block; private final Block block;
private final DatanodeDescriptor dn; private final DatanodeStorageInfo storageInfo;
private final String storageID;
private final ReplicaState reportedState; private final ReplicaState reportedState;
ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block, ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState) { ReplicaState reportedState) {
this.dn = dn; this.storageInfo = storageInfo;
this.storageID = storageID;
this.block = block; this.block = block;
this.reportedState = reportedState; this.reportedState = reportedState;
} }
@ -57,21 +56,18 @@ class PendingDataNodeMessages {
return block; return block;
} }
DatanodeDescriptor getNode() {
return dn;
}
String getStorageID() {
return storageID;
}
ReplicaState getReportedState() { ReplicaState getReportedState() {
return reportedState; return reportedState;
} }
DatanodeStorageInfo getStorageInfo() {
return storageInfo;
}
@Override @Override
public String toString() { public String toString() {
return "ReportedBlockInfo [block=" + block + ", dn=" + dn return "ReportedBlockInfo [block=" + block + ", dn="
+ storageInfo.getDatanodeDescriptor()
+ ", reportedState=" + reportedState + "]"; + ", reportedState=" + reportedState + "]";
} }
} }
@ -87,7 +83,7 @@ class PendingDataNodeMessages {
Queue<ReportedBlockInfo> oldQueue = entry.getValue(); Queue<ReportedBlockInfo> oldQueue = entry.getValue();
while (!oldQueue.isEmpty()) { while (!oldQueue.isEmpty()) {
ReportedBlockInfo rbi = oldQueue.remove(); ReportedBlockInfo rbi = oldQueue.remove();
if (!rbi.getNode().equals(dn)) { if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) {
newQueue.add(rbi); newQueue.add(rbi);
} else { } else {
count--; count--;
@ -97,11 +93,11 @@ class PendingDataNodeMessages {
} }
} }
void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState) { ReplicaState reportedState) {
block = new Block(block); block = new Block(block);
getBlockQueue(block).add( getBlockQueue(block).add(
new ReportedBlockInfo(dn, storageID, block, reportedState)); new ReportedBlockInfo(storageInfo, block, reportedState));
count++; count++;
} }

View File

@ -4358,8 +4358,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// Otherwise fsck will report these blocks as MISSING, especially if the // Otherwise fsck will report these blocks as MISSING, especially if the
// blocksReceived from Datanodes take a long time to arrive. // blocksReceived from Datanodes take a long time to arrive.
for (int i = 0; i < trimmedTargets.size(); i++) { for (int i = 0; i < trimmedTargets.size(); i++) {
trimmedTargets.get(i).addBlock( DatanodeStorageInfo storageInfo =
trimmedStorages.get(i), storedBlock); trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
if (storageInfo != null) {
storageInfo.addBlock(storedBlock);
}
} }
} }
@ -5838,7 +5841,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
public void processIncrementalBlockReport(final DatanodeID nodeID, public void processIncrementalBlockReport(final DatanodeID nodeID,
final String poolId, final StorageReceivedDeletedBlocks srdb) final StorageReceivedDeletedBlocks srdb)
throws IOException { throws IOException {
writeLock(); writeLock();
try { try {

View File

@ -1060,7 +1060,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
// for the same node and storage, so the value returned by the last // for the same node and storage, so the value returned by the last
// call of this loop is the final updated value for noStaleStorage. // call of this loop is the final updated value for noStaleStorage.
// //
noStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks); noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks);
metrics.incrStorageBlockReportOps(); metrics.incrStorageBlockReportOps();
} }
@ -1096,7 +1096,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
+" blocks."); +" blocks.");
} }
for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) { for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
namesystem.processIncrementalBlockReport(nodeReg, poolId, r); namesystem.processIncrementalBlockReport(nodeReg, r);
} }
} }

View File

@ -368,7 +368,7 @@ public class TestBlockManager {
DatanodeStorageInfo[] pipeline) throws IOException { DatanodeStorageInfo[] pipeline) throws IOException {
for (int i = 1; i < pipeline.length; i++) { for (int i = 1; i < pipeline.length; i++) {
DatanodeStorageInfo storage = pipeline[i]; DatanodeStorageInfo storage = pipeline[i];
bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null); bm.addBlock(storage, blockInfo, null);
blockInfo.addStorage(storage); blockInfo.addStorage(storage);
} }
} }
@ -549,12 +549,12 @@ public class TestBlockManager {
// send block report, should be processed // send block report, should be processed
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null)); new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed // send block report again, should NOT be processed
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null)); new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
@ -566,7 +566,7 @@ public class TestBlockManager {
assertEquals(0, ds.getBlockReportCount()); // ready for report again assertEquals(0, ds.getBlockReportCount()); // ready for report again
// send block report, should be processed after restart // send block report, should be processed after restart
reset(node); reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null)); new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
} }
@ -595,7 +595,7 @@ public class TestBlockManager {
// send block report while pretending to already have blocks // send block report while pretending to already have blocks
reset(node); reset(node);
doReturn(1).when(node).numBlocks(); doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
new BlockListAsLongs(null, null)); new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount()); assertEquals(1, ds.getBlockReportCount());
} }

View File

@ -63,16 +63,16 @@ public class TestDatanodeDescriptor {
assertTrue(storages.length > 0); assertTrue(storages.length > 0);
final String storageID = storages[0].getStorageID(); final String storageID = storages[0].getStorageID();
// add first block // add first block
assertTrue(dd.addBlock(storageID, blk)); assertTrue(storages[0].addBlock(blk));
assertEquals(1, dd.numBlocks()); assertEquals(1, dd.numBlocks());
// remove a non-existent block // remove a non-existent block
assertFalse(dd.removeBlock(blk1)); assertFalse(dd.removeBlock(blk1));
assertEquals(1, dd.numBlocks()); assertEquals(1, dd.numBlocks());
// add an existent block // add an existent block
assertFalse(dd.addBlock(storageID, blk)); assertFalse(storages[0].addBlock(blk));
assertEquals(1, dd.numBlocks()); assertEquals(1, dd.numBlocks());
// add second block // add second block
assertTrue(dd.addBlock(storageID, blk1)); assertTrue(storages[0].addBlock(blk1));
assertEquals(2, dd.numBlocks()); assertEquals(2, dd.numBlocks());
// remove first block // remove first block
assertTrue(dd.removeBlock(blk)); assertTrue(dd.removeBlock(blk));

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -43,8 +44,10 @@ public class TestPendingDataNodeMessages {
@Test @Test
public void testQueues() { public void testQueues() {
DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor(); DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor();
msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED); DatanodeStorage storage = new DatanodeStorage("STORAGE_ID");
msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED); DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage);
msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED);
assertEquals(2, msgs.count()); assertEquals(2, msgs.count());

View File

@ -76,7 +76,7 @@ public class TestReplicationPolicy {
private static NameNode namenode; private static NameNode namenode;
private static BlockPlacementPolicy replicator; private static BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt"; private static final String filename = "/dummyfile.txt";
private static DatanodeDescriptor dataNodes[]; private static DatanodeDescriptor[] dataNodes;
private static DatanodeStorageInfo[] storages; private static DatanodeStorageInfo[] storages;
// The interval for marking a datanode as stale, // The interval for marking a datanode as stale,
private static final long staleInterval = private static final long staleInterval =