HDFS-6794. Update BlockManager methods to use DatanodeStorageInfo where possible. (Arpit Agarwal)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1615169 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7e12b1912f
commit
45db4d204b
|
@ -343,6 +343,9 @@ Release 2.6.0 - UNRELEASED
|
|||
HDFS-6796. Improve the argument check during balancer command line parsing.
|
||||
(Benoy Antony via szetszwo)
|
||||
|
||||
HDFS-6794. Update BlockManager methods to use DatanodeStorageInfo
|
||||
where possible (Arpit Agarwal)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||
|
|
|
@ -1079,6 +1079,7 @@ public class BlockManager {
|
|||
* Mark the block belonging to datanode as corrupt
|
||||
* @param blk Block to be marked as corrupt
|
||||
* @param dn Datanode which holds the corrupt replica
|
||||
* @param storageID if known, null otherwise.
|
||||
* @param reason a textual reason why the block should be marked corrupt,
|
||||
* for logging purposes
|
||||
*/
|
||||
|
@ -1095,20 +1096,30 @@ public class BlockManager {
|
|||
+ blk + " not found");
|
||||
return;
|
||||
}
|
||||
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
|
||||
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
|
||||
dn, storageID);
|
||||
}
|
||||
|
||||
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
||||
DatanodeInfo dn, String storageID) throws IOException {
|
||||
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
|
||||
if (node == null) {
|
||||
throw new IOException("Cannot mark " + b
|
||||
throw new IOException("Cannot mark " + blk
|
||||
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
|
||||
+ ") does not exist");
|
||||
}
|
||||
|
||||
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
|
||||
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
|
||||
storageID == null ? null : node.getStorageInfo(storageID),
|
||||
node);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param b
|
||||
* @param storageInfo storage that contains the block, if known. null otherwise.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
||||
DatanodeStorageInfo storageInfo,
|
||||
DatanodeDescriptor node) throws IOException {
|
||||
|
||||
BlockCollection bc = b.corrupted.getBlockCollection();
|
||||
if (bc == null) {
|
||||
blockLog.info("BLOCK markBlockAsCorrupt: " + b
|
||||
|
@ -1118,7 +1129,9 @@ public class BlockManager {
|
|||
}
|
||||
|
||||
// Add replica to the data-node if it is not already there
|
||||
node.addBlock(storageID, b.stored);
|
||||
if (storageInfo != null) {
|
||||
storageInfo.addBlock(b.stored);
|
||||
}
|
||||
|
||||
// Add this replica to corruptReplicas Map
|
||||
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
|
||||
|
@ -1457,7 +1470,7 @@ public class BlockManager {
|
|||
* @throws IOException
|
||||
* if the number of targets < minimum replication.
|
||||
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
|
||||
* List, boolean, Set, long)
|
||||
* List, boolean, Set, long, StorageType)
|
||||
*/
|
||||
public DatanodeStorageInfo[] chooseTarget(final String src,
|
||||
final int numOfReplicas, final DatanodeDescriptor client,
|
||||
|
@ -1694,7 +1707,7 @@ public class BlockManager {
|
|||
* @throws IOException
|
||||
*/
|
||||
public boolean processReport(final DatanodeID nodeID,
|
||||
final DatanodeStorage storage, final String poolId,
|
||||
final DatanodeStorage storage,
|
||||
final BlockListAsLongs newReport) throws IOException {
|
||||
namesystem.writeLock();
|
||||
final long startTime = Time.now(); //after acquiring write lock
|
||||
|
@ -1726,9 +1739,9 @@ public class BlockManager {
|
|||
if (storageInfo.numBlocks() == 0) {
|
||||
// The first block report can be processed a lot more efficiently than
|
||||
// ordinary block reports. This shortens restart times.
|
||||
processFirstBlockReport(node, storage.getStorageID(), newReport);
|
||||
processFirstBlockReport(storageInfo, newReport);
|
||||
} else {
|
||||
processReport(node, storage, newReport);
|
||||
processReport(storageInfo, newReport);
|
||||
}
|
||||
|
||||
// Now that we have an up-to-date block report, we know that any
|
||||
|
@ -1790,8 +1803,7 @@ public class BlockManager {
|
|||
}
|
||||
}
|
||||
|
||||
private void processReport(final DatanodeDescriptor node,
|
||||
final DatanodeStorage storage,
|
||||
private void processReport(final DatanodeStorageInfo storageInfo,
|
||||
final BlockListAsLongs report) throws IOException {
|
||||
// Normal case:
|
||||
// Modify the (block-->datanode) map, according to the difference
|
||||
|
@ -1802,19 +1814,20 @@ public class BlockManager {
|
|||
Collection<Block> toInvalidate = new LinkedList<Block>();
|
||||
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
|
||||
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
|
||||
reportDiff(node, storage, report,
|
||||
reportDiff(storageInfo, report,
|
||||
toAdd, toRemove, toInvalidate, toCorrupt, toUC);
|
||||
|
||||
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
||||
// Process the blocks on each queue
|
||||
for (StatefulBlockInfo b : toUC) {
|
||||
addStoredBlockUnderConstruction(b, node, storage.getStorageID());
|
||||
addStoredBlockUnderConstruction(b, storageInfo);
|
||||
}
|
||||
for (Block b : toRemove) {
|
||||
removeStoredBlock(b, node);
|
||||
}
|
||||
int numBlocksLogged = 0;
|
||||
for (BlockInfo b : toAdd) {
|
||||
addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog);
|
||||
addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
|
||||
numBlocksLogged++;
|
||||
}
|
||||
if (numBlocksLogged > maxNumBlocksToLog) {
|
||||
|
@ -1828,7 +1841,7 @@ public class BlockManager {
|
|||
addToInvalidates(b, node);
|
||||
}
|
||||
for (BlockToMarkCorrupt b : toCorrupt) {
|
||||
markBlockAsCorrupt(b, node, storage.getStorageID());
|
||||
markBlockAsCorrupt(b, storageInfo, node);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1839,16 +1852,16 @@ public class BlockManager {
|
|||
* a toRemove list (since there won't be any). It also silently discards
|
||||
* any invalid blocks, thereby deferring their processing until
|
||||
* the next block report.
|
||||
* @param node - DatanodeDescriptor of the node that sent the report
|
||||
* @param storageInfo - DatanodeStorageInfo that sent the report
|
||||
* @param report - the initial block report, to be processed
|
||||
* @throws IOException
|
||||
*/
|
||||
private void processFirstBlockReport(final DatanodeDescriptor node,
|
||||
final String storageID,
|
||||
private void processFirstBlockReport(
|
||||
final DatanodeStorageInfo storageInfo,
|
||||
final BlockListAsLongs report) throws IOException {
|
||||
if (report == null) return;
|
||||
assert (namesystem.hasWriteLock());
|
||||
assert (node.getStorageInfo(storageID).numBlocks() == 0);
|
||||
assert (storageInfo.numBlocks() == 0);
|
||||
BlockReportIterator itBR = report.getBlockReportIterator();
|
||||
|
||||
while(itBR.hasNext()) {
|
||||
|
@ -1857,7 +1870,7 @@ public class BlockManager {
|
|||
|
||||
if (shouldPostponeBlocksFromFuture &&
|
||||
namesystem.isGenStampInFuture(iblk)) {
|
||||
queueReportedBlock(node, storageID, iblk, reportedState,
|
||||
queueReportedBlock(storageInfo, iblk, reportedState,
|
||||
QUEUE_REASON_FUTURE_GENSTAMP);
|
||||
continue;
|
||||
}
|
||||
|
@ -1869,15 +1882,16 @@ public class BlockManager {
|
|||
// If block is corrupt, mark it and continue to next block.
|
||||
BlockUCState ucState = storedBlock.getBlockUCState();
|
||||
BlockToMarkCorrupt c = checkReplicaCorrupt(
|
||||
iblk, reportedState, storedBlock, ucState, node);
|
||||
iblk, reportedState, storedBlock, ucState,
|
||||
storageInfo.getDatanodeDescriptor());
|
||||
if (c != null) {
|
||||
if (shouldPostponeBlocksFromFuture) {
|
||||
// In the Standby, we may receive a block report for a file that we
|
||||
// just have an out-of-date gen-stamp or state for, for example.
|
||||
queueReportedBlock(node, storageID, iblk, reportedState,
|
||||
queueReportedBlock(storageInfo, iblk, reportedState,
|
||||
QUEUE_REASON_CORRUPT_STATE);
|
||||
} else {
|
||||
markBlockAsCorrupt(c, node, storageID);
|
||||
markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
@ -1885,7 +1899,7 @@ public class BlockManager {
|
|||
// If block is under construction, add this replica to its list
|
||||
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
|
||||
((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
|
||||
node.getStorageInfo(storageID), iblk, reportedState);
|
||||
storageInfo, iblk, reportedState);
|
||||
// OpenFileBlocks only inside snapshots also will be added to safemode
|
||||
// threshold. So we need to update such blocks to safemode
|
||||
// refer HDFS-5283
|
||||
|
@ -1898,12 +1912,12 @@ public class BlockManager {
|
|||
}
|
||||
//add replica if appropriate
|
||||
if (reportedState == ReplicaState.FINALIZED) {
|
||||
addStoredBlockImmediate(storedBlock, node, storageID);
|
||||
addStoredBlockImmediate(storedBlock, storageInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage,
|
||||
private void reportDiff(DatanodeStorageInfo storageInfo,
|
||||
BlockListAsLongs newReport,
|
||||
Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
|
||||
Collection<Block> toRemove, // remove from DatanodeDescriptor
|
||||
|
@ -1911,8 +1925,6 @@ public class BlockManager {
|
|||
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
|
||||
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
|
||||
|
||||
final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
|
||||
|
||||
// place a delimiter in the list which separates blocks
|
||||
// that have been reported from those that have not
|
||||
BlockInfo delimiter = new BlockInfo(new Block(), 1);
|
||||
|
@ -1929,7 +1941,7 @@ public class BlockManager {
|
|||
while(itBR.hasNext()) {
|
||||
Block iblk = itBR.next();
|
||||
ReplicaState iState = itBR.getCurrentReplicaState();
|
||||
BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
|
||||
BlockInfo storedBlock = processReportedBlock(storageInfo,
|
||||
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
|
||||
|
||||
// move block to the head of the list
|
||||
|
@ -1966,7 +1978,7 @@ public class BlockManager {
|
|||
* BlockInfoUnderConstruction's list of replicas.</li>
|
||||
* </ol>
|
||||
*
|
||||
* @param dn descriptor for the datanode that made the report
|
||||
* @param storageInfo DatanodeStorageInfo that sent the report.
|
||||
* @param block reported block replica
|
||||
* @param reportedState reported replica state
|
||||
* @param toAdd add to DatanodeDescriptor
|
||||
|
@ -1978,14 +1990,16 @@ public class BlockManager {
|
|||
* @return the up-to-date stored block, if it should be kept.
|
||||
* Otherwise, null.
|
||||
*/
|
||||
private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
|
||||
final String storageID,
|
||||
private BlockInfo processReportedBlock(
|
||||
final DatanodeStorageInfo storageInfo,
|
||||
final Block block, final ReplicaState reportedState,
|
||||
final Collection<BlockInfo> toAdd,
|
||||
final Collection<Block> toInvalidate,
|
||||
final Collection<BlockToMarkCorrupt> toCorrupt,
|
||||
final Collection<StatefulBlockInfo> toUC) {
|
||||
|
||||
DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Reported block " + block
|
||||
+ " on " + dn + " size " + block.getNumBytes()
|
||||
|
@ -1994,7 +2008,7 @@ public class BlockManager {
|
|||
|
||||
if (shouldPostponeBlocksFromFuture &&
|
||||
namesystem.isGenStampInFuture(block)) {
|
||||
queueReportedBlock(dn, storageID, block, reportedState,
|
||||
queueReportedBlock(storageInfo, block, reportedState,
|
||||
QUEUE_REASON_FUTURE_GENSTAMP);
|
||||
return null;
|
||||
}
|
||||
|
@ -2034,7 +2048,7 @@ public class BlockManager {
|
|||
// TODO: Pretty confident this should be s/storedBlock/block below,
|
||||
// since we should be postponing the info of the reported block, not
|
||||
// the stored block. See HDFS-6289 for more context.
|
||||
queueReportedBlock(dn, storageID, storedBlock, reportedState,
|
||||
queueReportedBlock(storageInfo, storedBlock, reportedState,
|
||||
QUEUE_REASON_CORRUPT_STATE);
|
||||
} else {
|
||||
toCorrupt.add(c);
|
||||
|
@ -2063,17 +2077,17 @@ public class BlockManager {
|
|||
* standby node. @see PendingDataNodeMessages.
|
||||
* @param reason a textual reason to report in the debug logs
|
||||
*/
|
||||
private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
|
||||
private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
|
||||
ReplicaState reportedState, String reason) {
|
||||
assert shouldPostponeBlocksFromFuture;
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Queueing reported block " + block +
|
||||
" in state " + reportedState +
|
||||
" from datanode " + dn + " for later processing " +
|
||||
"because " + reason + ".");
|
||||
" from datanode " + storageInfo.getDatanodeDescriptor() +
|
||||
" for later processing because " + reason + ".");
|
||||
}
|
||||
pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState);
|
||||
pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2096,7 +2110,7 @@ public class BlockManager {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processing previouly queued message " + rbi);
|
||||
}
|
||||
processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(),
|
||||
processAndHandleReportedBlock(rbi.getStorageInfo(),
|
||||
rbi.getBlock(), rbi.getReportedState(), null);
|
||||
}
|
||||
}
|
||||
|
@ -2216,19 +2230,20 @@ public class BlockManager {
|
|||
}
|
||||
|
||||
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
|
||||
DatanodeDescriptor node, String storageID) throws IOException {
|
||||
DatanodeStorageInfo storageInfo) throws IOException {
|
||||
BlockInfoUnderConstruction block = ucBlock.storedBlock;
|
||||
block.addReplicaIfNotPresent(node.getStorageInfo(storageID),
|
||||
ucBlock.reportedBlock, ucBlock.reportedState);
|
||||
block.addReplicaIfNotPresent(
|
||||
storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
|
||||
|
||||
if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
|
||||
addStoredBlock(block, node, storageID, null, true);
|
||||
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
|
||||
block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) {
|
||||
addStoredBlock(block, storageInfo, null, true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Faster version of
|
||||
* {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)}
|
||||
* {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)}
|
||||
* , intended for use with initial block report at startup. If not in startup
|
||||
* safe mode, will call standard addStoredBlock(). Assumes this method is
|
||||
* called "immediately" so there is no need to refresh the storedBlock from
|
||||
|
@ -2239,17 +2254,17 @@ public class BlockManager {
|
|||
* @throws IOException
|
||||
*/
|
||||
private void addStoredBlockImmediate(BlockInfo storedBlock,
|
||||
DatanodeDescriptor node, String storageID)
|
||||
DatanodeStorageInfo storageInfo)
|
||||
throws IOException {
|
||||
assert (storedBlock != null && namesystem.hasWriteLock());
|
||||
if (!namesystem.isInStartupSafeMode()
|
||||
|| namesystem.isPopulatingReplQueues()) {
|
||||
addStoredBlock(storedBlock, node, storageID, null, false);
|
||||
addStoredBlock(storedBlock, storageInfo, null, false);
|
||||
return;
|
||||
}
|
||||
|
||||
// just add it
|
||||
node.addBlock(storageID, storedBlock);
|
||||
storageInfo.addBlock(storedBlock);
|
||||
|
||||
// Now check for completion of blocks and safe block count
|
||||
int numCurrentReplica = countLiveNodes(storedBlock);
|
||||
|
@ -2271,13 +2286,13 @@ public class BlockManager {
|
|||
* @return the block that is stored in blockMap.
|
||||
*/
|
||||
private Block addStoredBlock(final BlockInfo block,
|
||||
DatanodeDescriptor node,
|
||||
String storageID,
|
||||
DatanodeStorageInfo storageInfo,
|
||||
DatanodeDescriptor delNodeHint,
|
||||
boolean logEveryBlock)
|
||||
throws IOException {
|
||||
assert block != null && namesystem.hasWriteLock();
|
||||
BlockInfo storedBlock;
|
||||
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
||||
if (block instanceof BlockInfoUnderConstruction) {
|
||||
//refresh our copy in case the block got completed in another thread
|
||||
storedBlock = blocksMap.getStoredBlock(block);
|
||||
|
@ -2297,7 +2312,7 @@ public class BlockManager {
|
|||
assert bc != null : "Block must belong to a file";
|
||||
|
||||
// add block to the datanode
|
||||
boolean added = node.addBlock(storageID, storedBlock);
|
||||
boolean added = storageInfo.addBlock(storedBlock);
|
||||
|
||||
int curReplicaDelta;
|
||||
if (added) {
|
||||
|
@ -2843,8 +2858,9 @@ public class BlockManager {
|
|||
* The given node is reporting that it received a certain block.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint)
|
||||
void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint)
|
||||
throws IOException {
|
||||
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
||||
// Decrement number of blocks scheduled to this datanode.
|
||||
// for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with
|
||||
// RECEIVED_BLOCK), we currently also decrease the approximate number.
|
||||
|
@ -2864,12 +2880,12 @@ public class BlockManager {
|
|||
// Modify the blocks->datanode map and node's map.
|
||||
//
|
||||
pendingReplications.decrement(block, node);
|
||||
processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED,
|
||||
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
|
||||
delHintNode);
|
||||
}
|
||||
|
||||
private void processAndHandleReportedBlock(DatanodeDescriptor node,
|
||||
String storageID, Block block,
|
||||
private void processAndHandleReportedBlock(
|
||||
DatanodeStorageInfo storageInfo, Block block,
|
||||
ReplicaState reportedState, DatanodeDescriptor delHintNode)
|
||||
throws IOException {
|
||||
// blockReceived reports a finalized block
|
||||
|
@ -2877,7 +2893,9 @@ public class BlockManager {
|
|||
Collection<Block> toInvalidate = new LinkedList<Block>();
|
||||
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
|
||||
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
|
||||
processReportedBlock(node, storageID, block, reportedState,
|
||||
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
|
||||
|
||||
processReportedBlock(storageInfo, block, reportedState,
|
||||
toAdd, toInvalidate, toCorrupt, toUC);
|
||||
// the block is only in one of the to-do lists
|
||||
// if it is in none then data-node already has it
|
||||
|
@ -2885,11 +2903,11 @@ public class BlockManager {
|
|||
: "The block should be only in one of the lists.";
|
||||
|
||||
for (StatefulBlockInfo b : toUC) {
|
||||
addStoredBlockUnderConstruction(b, node, storageID);
|
||||
addStoredBlockUnderConstruction(b, storageInfo);
|
||||
}
|
||||
long numBlocksLogged = 0;
|
||||
for (BlockInfo b : toAdd) {
|
||||
addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog);
|
||||
addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
|
||||
numBlocksLogged++;
|
||||
}
|
||||
if (numBlocksLogged > maxNumBlocksToLog) {
|
||||
|
@ -2903,7 +2921,7 @@ public class BlockManager {
|
|||
addToInvalidates(b, node);
|
||||
}
|
||||
for (BlockToMarkCorrupt b : toCorrupt) {
|
||||
markBlockAsCorrupt(b, node, storageID);
|
||||
markBlockAsCorrupt(b, storageInfo, node);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2930,13 +2948,15 @@ public class BlockManager {
|
|||
"Got incremental block report from unregistered or dead node");
|
||||
}
|
||||
|
||||
if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
|
||||
DatanodeStorageInfo storageInfo =
|
||||
node.getStorageInfo(srdb.getStorage().getStorageID());
|
||||
if (storageInfo == null) {
|
||||
// The DataNode is reporting an unknown storage. Usually the NN learns
|
||||
// about new storages from heartbeats but during NN restart we may
|
||||
// receive a block report or incremental report before the heartbeat.
|
||||
// We must handle this for protocol compatibility. This issue was
|
||||
// uncovered by HDFS-6094.
|
||||
node.updateStorage(srdb.getStorage());
|
||||
storageInfo = node.updateStorage(srdb.getStorage());
|
||||
}
|
||||
|
||||
for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
|
||||
|
@ -2946,14 +2966,13 @@ public class BlockManager {
|
|||
deleted++;
|
||||
break;
|
||||
case RECEIVED_BLOCK:
|
||||
addBlock(node, srdb.getStorage().getStorageID(),
|
||||
rdbi.getBlock(), rdbi.getDelHints());
|
||||
addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints());
|
||||
received++;
|
||||
break;
|
||||
case RECEIVING_BLOCK:
|
||||
receiving++;
|
||||
processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
|
||||
rdbi.getBlock(), ReplicaState.RBW, null);
|
||||
processAndHandleReportedBlock(storageInfo, rdbi.getBlock(),
|
||||
ReplicaState.RBW, null);
|
||||
break;
|
||||
default:
|
||||
String msg =
|
||||
|
|
|
@ -207,7 +207,7 @@ public class DatanodeStorageInfo {
|
|||
return blockPoolUsed;
|
||||
}
|
||||
|
||||
boolean addBlock(BlockInfo b) {
|
||||
public boolean addBlock(BlockInfo b) {
|
||||
if(!b.addStorage(this))
|
||||
return false;
|
||||
// add to the head of the data-node list
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
|||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
|
||||
/**
|
||||
* In the Standby Node, we can receive messages about blocks
|
||||
|
@ -41,14 +42,12 @@ class PendingDataNodeMessages {
|
|||
|
||||
static class ReportedBlockInfo {
|
||||
private final Block block;
|
||||
private final DatanodeDescriptor dn;
|
||||
private final String storageID;
|
||||
private final DatanodeStorageInfo storageInfo;
|
||||
private final ReplicaState reportedState;
|
||||
|
||||
ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
|
||||
ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block,
|
||||
ReplicaState reportedState) {
|
||||
this.dn = dn;
|
||||
this.storageID = storageID;
|
||||
this.storageInfo = storageInfo;
|
||||
this.block = block;
|
||||
this.reportedState = reportedState;
|
||||
}
|
||||
|
@ -57,21 +56,18 @@ class PendingDataNodeMessages {
|
|||
return block;
|
||||
}
|
||||
|
||||
DatanodeDescriptor getNode() {
|
||||
return dn;
|
||||
}
|
||||
|
||||
String getStorageID() {
|
||||
return storageID;
|
||||
}
|
||||
|
||||
ReplicaState getReportedState() {
|
||||
return reportedState;
|
||||
}
|
||||
|
||||
DatanodeStorageInfo getStorageInfo() {
|
||||
return storageInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ReportedBlockInfo [block=" + block + ", dn=" + dn
|
||||
return "ReportedBlockInfo [block=" + block + ", dn="
|
||||
+ storageInfo.getDatanodeDescriptor()
|
||||
+ ", reportedState=" + reportedState + "]";
|
||||
}
|
||||
}
|
||||
|
@ -87,7 +83,7 @@ class PendingDataNodeMessages {
|
|||
Queue<ReportedBlockInfo> oldQueue = entry.getValue();
|
||||
while (!oldQueue.isEmpty()) {
|
||||
ReportedBlockInfo rbi = oldQueue.remove();
|
||||
if (!rbi.getNode().equals(dn)) {
|
||||
if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) {
|
||||
newQueue.add(rbi);
|
||||
} else {
|
||||
count--;
|
||||
|
@ -97,11 +93,11 @@ class PendingDataNodeMessages {
|
|||
}
|
||||
}
|
||||
|
||||
void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
|
||||
void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
|
||||
ReplicaState reportedState) {
|
||||
block = new Block(block);
|
||||
getBlockQueue(block).add(
|
||||
new ReportedBlockInfo(dn, storageID, block, reportedState));
|
||||
new ReportedBlockInfo(storageInfo, block, reportedState));
|
||||
count++;
|
||||
}
|
||||
|
||||
|
|
|
@ -4355,8 +4355,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
// Otherwise fsck will report these blocks as MISSING, especially if the
|
||||
// blocksReceived from Datanodes take a long time to arrive.
|
||||
for (int i = 0; i < trimmedTargets.size(); i++) {
|
||||
trimmedTargets.get(i).addBlock(
|
||||
trimmedStorages.get(i), storedBlock);
|
||||
DatanodeStorageInfo storageInfo =
|
||||
trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
|
||||
if (storageInfo != null) {
|
||||
storageInfo.addBlock(storedBlock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5835,7 +5838,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
|
||||
public void processIncrementalBlockReport(final DatanodeID nodeID,
|
||||
final String poolId, final StorageReceivedDeletedBlocks srdb)
|
||||
final StorageReceivedDeletedBlocks srdb)
|
||||
throws IOException {
|
||||
writeLock();
|
||||
try {
|
||||
|
|
|
@ -1065,7 +1065,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
// for the same node and storage, so the value returned by the last
|
||||
// call of this loop is the final updated value for noStaleStorage.
|
||||
//
|
||||
noStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
|
||||
noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks);
|
||||
metrics.incrStorageBlockReportOps();
|
||||
}
|
||||
|
||||
|
@ -1101,7 +1101,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
+" blocks.");
|
||||
}
|
||||
for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
|
||||
namesystem.processIncrementalBlockReport(nodeReg, poolId, r);
|
||||
namesystem.processIncrementalBlockReport(nodeReg, r);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -368,7 +368,7 @@ public class TestBlockManager {
|
|||
DatanodeStorageInfo[] pipeline) throws IOException {
|
||||
for (int i = 1; i < pipeline.length; i++) {
|
||||
DatanodeStorageInfo storage = pipeline[i];
|
||||
bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null);
|
||||
bm.addBlock(storage, blockInfo, null);
|
||||
blockInfo.addStorage(storage);
|
||||
}
|
||||
}
|
||||
|
@ -549,12 +549,12 @@ public class TestBlockManager {
|
|||
// send block report, should be processed
|
||||
reset(node);
|
||||
|
||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
|
||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||
new BlockListAsLongs(null, null));
|
||||
assertEquals(1, ds.getBlockReportCount());
|
||||
// send block report again, should NOT be processed
|
||||
reset(node);
|
||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
|
||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||
new BlockListAsLongs(null, null));
|
||||
assertEquals(1, ds.getBlockReportCount());
|
||||
|
||||
|
@ -566,7 +566,7 @@ public class TestBlockManager {
|
|||
assertEquals(0, ds.getBlockReportCount()); // ready for report again
|
||||
// send block report, should be processed after restart
|
||||
reset(node);
|
||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
|
||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||
new BlockListAsLongs(null, null));
|
||||
assertEquals(1, ds.getBlockReportCount());
|
||||
}
|
||||
|
@ -595,7 +595,7 @@ public class TestBlockManager {
|
|||
// send block report while pretending to already have blocks
|
||||
reset(node);
|
||||
doReturn(1).when(node).numBlocks();
|
||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
|
||||
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
|
||||
new BlockListAsLongs(null, null));
|
||||
assertEquals(1, ds.getBlockReportCount());
|
||||
}
|
||||
|
|
|
@ -63,16 +63,16 @@ public class TestDatanodeDescriptor {
|
|||
assertTrue(storages.length > 0);
|
||||
final String storageID = storages[0].getStorageID();
|
||||
// add first block
|
||||
assertTrue(dd.addBlock(storageID, blk));
|
||||
assertTrue(storages[0].addBlock(blk));
|
||||
assertEquals(1, dd.numBlocks());
|
||||
// remove a non-existent block
|
||||
assertFalse(dd.removeBlock(blk1));
|
||||
assertEquals(1, dd.numBlocks());
|
||||
// add an existent block
|
||||
assertFalse(dd.addBlock(storageID, blk));
|
||||
assertFalse(storages[0].addBlock(blk));
|
||||
assertEquals(1, dd.numBlocks());
|
||||
// add second block
|
||||
assertTrue(dd.addBlock(storageID, blk1));
|
||||
assertTrue(storages[0].addBlock(blk1));
|
||||
assertEquals(2, dd.numBlocks());
|
||||
// remove first block
|
||||
assertTrue(dd.removeBlock(blk));
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
@ -43,8 +44,10 @@ public class TestPendingDataNodeMessages {
|
|||
@Test
|
||||
public void testQueues() {
|
||||
DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor();
|
||||
msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED);
|
||||
msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED);
|
||||
DatanodeStorage storage = new DatanodeStorage("STORAGE_ID");
|
||||
DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage);
|
||||
msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED);
|
||||
msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED);
|
||||
|
||||
assertEquals(2, msgs.count());
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ public class TestReplicationPolicy {
|
|||
private static NameNode namenode;
|
||||
private static BlockPlacementPolicy replicator;
|
||||
private static final String filename = "/dummyfile.txt";
|
||||
private static DatanodeDescriptor dataNodes[];
|
||||
private static DatanodeDescriptor[] dataNodes;
|
||||
private static DatanodeStorageInfo[] storages;
|
||||
// The interval for marking a datanode as stale,
|
||||
private static final long staleInterval =
|
||||
|
@ -1118,8 +1118,7 @@ public class TestReplicationPolicy {
|
|||
// Adding this block will increase its current replication, and that will
|
||||
// remove it from the queue.
|
||||
bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
|
||||
ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0],
|
||||
"STORAGE");
|
||||
ReplicaState.FINALIZED), TestReplicationPolicy.storages[0]);
|
||||
|
||||
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
|
||||
// from QUEUE_VERY_UNDER_REPLICATED.
|
||||
|
|
Loading…
Reference in New Issue