From 1b4f028385d5eba1622db83af4b9177551208255 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Fri, 1 Aug 2014 17:35:14 +0000 Subject: [PATCH] HDFS-6794: Merging r1615169 from trunk to branch-2. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1615175 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 159 ++++++++++-------- .../blockmanagement/DatanodeStorageInfo.java | 2 +- .../PendingDataNodeMessages.java | 30 ++-- .../hdfs/server/namenode/FSNamesystem.java | 9 +- .../server/namenode/NameNodeRpcServer.java | 4 +- .../blockmanagement/TestBlockManager.java | 10 +- .../TestDatanodeDescriptor.java | 6 +- .../TestPendingDataNodeMessages.java | 7 +- .../TestReplicationPolicy.java | 2 +- 10 files changed, 128 insertions(+), 104 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a65a82a9c6a..ad067b17fce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -91,6 +91,9 @@ Release 2.6.0 - UNRELEASED HDFS-6796. Improve the argument check during balancer command line parsing. (Benoy Antony via szetszwo) + HDFS-6794. Update BlockManager methods to use DatanodeStorageInfo + where possible (Arpit Agarwal) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 132249f2c83..b4daa35c7eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1079,6 +1079,7 @@ private void addToInvalidates(Block b) { * Mark the block belonging to datanode as corrupt * @param blk Block to be marked as corrupt * @param dn Datanode which holds the corrupt replica + * @param storageID if known, null otherwise. * @param reason a textual reason why the block should be marked corrupt, * for logging purposes */ @@ -1095,19 +1096,29 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, + blk + " not found"); return; } - markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, - blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), - dn, storageID); - } - private void markBlockAsCorrupt(BlockToMarkCorrupt b, - DatanodeInfo dn, String storageID) throws IOException { DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { - throw new IOException("Cannot mark " + b + throw new IOException("Cannot mark " + blk + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + ") does not exist"); } + + markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, + blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), + storageID == null ? null : node.getStorageInfo(storageID), + node); + } + + /** + * + * @param b + * @param storageInfo storage that contains the block, if known. null otherwise. + * @throws IOException + */ + private void markBlockAsCorrupt(BlockToMarkCorrupt b, + DatanodeStorageInfo storageInfo, + DatanodeDescriptor node) throws IOException { BlockCollection bc = b.corrupted.getBlockCollection(); if (bc == null) { @@ -1118,7 +1129,9 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b, } // Add replica to the data-node if it is not already there - node.addBlock(storageID, b.stored); + if (storageInfo != null) { + storageInfo.addBlock(b.stored); + } // Add this replica to corruptReplicas Map corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, @@ -1460,7 +1473,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { * @throws IOException * if the number of targets < minimum replication. * @see BlockPlacementPolicy#chooseTarget(String, int, Node, - * List, boolean, Set, long) + * List, boolean, Set, long, StorageType) */ public DatanodeStorageInfo[] chooseTarget(final String src, final int numOfReplicas, final DatanodeDescriptor client, @@ -1697,7 +1710,7 @@ public String toString() { * @throws IOException */ public boolean processReport(final DatanodeID nodeID, - final DatanodeStorage storage, final String poolId, + final DatanodeStorage storage, final BlockListAsLongs newReport) throws IOException { namesystem.writeLock(); final long startTime = Time.now(); //after acquiring write lock @@ -1729,9 +1742,9 @@ public boolean processReport(final DatanodeID nodeID, if (storageInfo.numBlocks() == 0) { // The first block report can be processed a lot more efficiently than // ordinary block reports. This shortens restart times. - processFirstBlockReport(node, storage.getStorageID(), newReport); + processFirstBlockReport(storageInfo, newReport); } else { - processReport(node, storage, newReport); + processReport(storageInfo, newReport); } // Now that we have an up-to-date block report, we know that any @@ -1793,9 +1806,8 @@ private void rescanPostponedMisreplicatedBlocks() { } } - private void processReport(final DatanodeDescriptor node, - final DatanodeStorage storage, - final BlockListAsLongs report) throws IOException { + private void processReport(final DatanodeStorageInfo storageInfo, + final BlockListAsLongs report) throws IOException { // Normal case: // Modify the (block-->datanode) map, according to the difference // between the old and new block report. @@ -1805,19 +1817,20 @@ private void processReport(final DatanodeDescriptor node, Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); Collection toUC = new LinkedList(); - reportDiff(node, storage, report, + reportDiff(storageInfo, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); - + + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Process the blocks on each queue for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, node, storage.getStorageID()); + addStoredBlockUnderConstruction(b, storageInfo); } for (Block b : toRemove) { removeStoredBlock(b, node); } int numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -1831,7 +1844,7 @@ private void processReport(final DatanodeDescriptor node, addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node, storage.getStorageID()); + markBlockAsCorrupt(b, storageInfo, node); } } @@ -1842,16 +1855,16 @@ private void processReport(final DatanodeDescriptor node, * a toRemove list (since there won't be any). It also silently discards * any invalid blocks, thereby deferring their processing until * the next block report. - * @param node - DatanodeDescriptor of the node that sent the report + * @param storageInfo - DatanodeStorageInfo that sent the report * @param report - the initial block report, to be processed * @throws IOException */ - private void processFirstBlockReport(final DatanodeDescriptor node, - final String storageID, + private void processFirstBlockReport( + final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { if (report == null) return; assert (namesystem.hasWriteLock()); - assert (node.getStorageInfo(storageID).numBlocks() == 0); + assert (storageInfo.numBlocks() == 0); BlockReportIterator itBR = report.getBlockReportIterator(); while(itBR.hasNext()) { @@ -1860,7 +1873,7 @@ private void processFirstBlockReport(final DatanodeDescriptor node, if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(iblk)) { - queueReportedBlock(node, storageID, iblk, reportedState, + queueReportedBlock(storageInfo, iblk, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); continue; } @@ -1872,15 +1885,16 @@ private void processFirstBlockReport(final DatanodeDescriptor node, // If block is corrupt, mark it and continue to next block. BlockUCState ucState = storedBlock.getBlockUCState(); BlockToMarkCorrupt c = checkReplicaCorrupt( - iblk, reportedState, storedBlock, ucState, node); + iblk, reportedState, storedBlock, ucState, + storageInfo.getDatanodeDescriptor()); if (c != null) { if (shouldPostponeBlocksFromFuture) { // In the Standby, we may receive a block report for a file that we // just have an out-of-date gen-stamp or state for, for example. - queueReportedBlock(node, storageID, iblk, reportedState, + queueReportedBlock(storageInfo, iblk, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { - markBlockAsCorrupt(c, node, storageID); + markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor()); } continue; } @@ -1888,7 +1902,7 @@ private void processFirstBlockReport(final DatanodeDescriptor node, // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent( - node.getStorageInfo(storageID), iblk, reportedState); + storageInfo, iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 @@ -1901,12 +1915,12 @@ private void processFirstBlockReport(final DatanodeDescriptor node, } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, node, storageID); + addStoredBlockImmediate(storedBlock, storageInfo); } } } - private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, + private void reportDiff(DatanodeStorageInfo storageInfo, BlockListAsLongs newReport, Collection toAdd, // add to DatanodeDescriptor Collection toRemove, // remove from DatanodeDescriptor @@ -1914,8 +1928,6 @@ private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, Collection toCorrupt, // add to corrupt replicas list Collection toUC) { // add to under-construction list - final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID()); - // place a delimiter in the list which separates blocks // that have been reported from those that have not BlockInfo delimiter = new BlockInfo(new Block(), 1); @@ -1932,7 +1944,7 @@ private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, while(itBR.hasNext()) { Block iblk = itBR.next(); ReplicaState iState = itBR.getCurrentReplicaState(); - BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(), + BlockInfo storedBlock = processReportedBlock(storageInfo, iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); // move block to the head of the list @@ -1969,7 +1981,7 @@ private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, * BlockInfoUnderConstruction's list of replicas. * * - * @param dn descriptor for the datanode that made the report + * @param storageInfo DatanodeStorageInfo that sent the report. * @param block reported block replica * @param reportedState reported replica state * @param toAdd add to DatanodeDescriptor @@ -1981,14 +1993,16 @@ private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, * @return the up-to-date stored block, if it should be kept. * Otherwise, null. */ - private BlockInfo processReportedBlock(final DatanodeDescriptor dn, - final String storageID, + private BlockInfo processReportedBlock( + final DatanodeStorageInfo storageInfo, final Block block, final ReplicaState reportedState, final Collection toAdd, final Collection toInvalidate, final Collection toCorrupt, final Collection toUC) { + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); + if(LOG.isDebugEnabled()) { LOG.debug("Reported block " + block + " on " + dn + " size " + block.getNumBytes() @@ -1997,7 +2011,7 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn, if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(block)) { - queueReportedBlock(dn, storageID, block, reportedState, + queueReportedBlock(storageInfo, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); return null; } @@ -2037,7 +2051,7 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn, // TODO: Pretty confident this should be s/storedBlock/block below, // since we should be postponing the info of the reported block, not // the stored block. See HDFS-6289 for more context. - queueReportedBlock(dn, storageID, storedBlock, reportedState, + queueReportedBlock(storageInfo, storedBlock, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { toCorrupt.add(c); @@ -2066,17 +2080,17 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn, * standby node. @see PendingDataNodeMessages. * @param reason a textual reason to report in the debug logs */ - private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, + private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, String reason) { assert shouldPostponeBlocksFromFuture; if (LOG.isDebugEnabled()) { LOG.debug("Queueing reported block " + block + " in state " + reportedState + - " from datanode " + dn + " for later processing " + - "because " + reason + "."); + " from datanode " + storageInfo.getDatanodeDescriptor() + + " for later processing because " + reason + "."); } - pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState); + pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState); } /** @@ -2099,7 +2113,7 @@ private void processQueuedMessages(Iterable rbis) if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } - processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), + processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(), rbi.getReportedState(), null); } } @@ -2219,19 +2233,20 @@ private boolean isBlockUnderConstruction(BlockInfo storedBlock, } void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, - DatanodeDescriptor node, String storageID) throws IOException { + DatanodeStorageInfo storageInfo) throws IOException { BlockInfoUnderConstruction block = ucBlock.storedBlock; - block.addReplicaIfNotPresent(node.getStorageInfo(storageID), - ucBlock.reportedBlock, ucBlock.reportedState); + block.addReplicaIfNotPresent( + storageInfo, ucBlock.reportedBlock, ucBlock.reportedState); - if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) { - addStoredBlock(block, node, storageID, null, true); + if (ucBlock.reportedState == ReplicaState.FINALIZED && + block.findDatanode(storageInfo.getDatanodeDescriptor()) < 0) { + addStoredBlock(block, storageInfo, null, true); } } /** * Faster version of - * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)} + * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)} * , intended for use with initial block report at startup. If not in startup * safe mode, will call standard addStoredBlock(). Assumes this method is * called "immediately" so there is no need to refresh the storedBlock from @@ -2242,17 +2257,17 @@ void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, * @throws IOException */ private void addStoredBlockImmediate(BlockInfo storedBlock, - DatanodeDescriptor node, String storageID) + DatanodeStorageInfo storageInfo) throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, node, storageID, null, false); + addStoredBlock(storedBlock, storageInfo, null, false); return; } // just add it - node.addBlock(storageID, storedBlock); + storageInfo.addBlock(storedBlock); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); @@ -2274,13 +2289,13 @@ private void addStoredBlockImmediate(BlockInfo storedBlock, * @return the block that is stored in blockMap. */ private Block addStoredBlock(final BlockInfo block, - DatanodeDescriptor node, - String storageID, + DatanodeStorageInfo storageInfo, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { assert block != null && namesystem.hasWriteLock(); BlockInfo storedBlock; + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (block instanceof BlockInfoUnderConstruction) { //refresh our copy in case the block got completed in another thread storedBlock = blocksMap.getStoredBlock(block); @@ -2300,7 +2315,7 @@ private Block addStoredBlock(final BlockInfo block, assert bc != null : "Block must belong to a file"; // add block to the datanode - boolean added = node.addBlock(storageID, storedBlock); + boolean added = storageInfo.addBlock(storedBlock); int curReplicaDelta; if (added) { @@ -2846,8 +2861,9 @@ private long addBlock(Block block, List results) { * The given node is reporting that it received a certain block. */ @VisibleForTesting - void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint) + void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint) throws IOException { + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Decrement number of blocks scheduled to this datanode. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // RECEIVED_BLOCK), we currently also decrease the approximate number. @@ -2867,12 +2883,12 @@ void addBlock(DatanodeDescriptor node, String storageID, Block block, String del // Modify the blocks->datanode map and node's map. // pendingReplications.decrement(block, node); - processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED, + processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } - private void processAndHandleReportedBlock(DatanodeDescriptor node, - String storageID, Block block, + private void processAndHandleReportedBlock( + DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { // blockReceived reports a finalized block @@ -2880,7 +2896,9 @@ private void processAndHandleReportedBlock(DatanodeDescriptor node, Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); Collection toUC = new LinkedList(); - processReportedBlock(node, storageID, block, reportedState, + final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); + + processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it @@ -2888,11 +2906,11 @@ private void processAndHandleReportedBlock(DatanodeDescriptor node, : "The block should be only in one of the lists."; for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, node, storageID); + addStoredBlockUnderConstruction(b, storageInfo); } long numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2906,7 +2924,7 @@ private void processAndHandleReportedBlock(DatanodeDescriptor node, addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node, storageID); + markBlockAsCorrupt(b, storageInfo, node); } } @@ -2933,13 +2951,15 @@ public void processIncrementalBlockReport(final DatanodeID nodeID, "Got incremental block report from unregistered or dead node"); } - if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) { + DatanodeStorageInfo storageInfo = + node.getStorageInfo(srdb.getStorage().getStorageID()); + if (storageInfo == null) { // The DataNode is reporting an unknown storage. Usually the NN learns // about new storages from heartbeats but during NN restart we may // receive a block report or incremental report before the heartbeat. // We must handle this for protocol compatibility. This issue was // uncovered by HDFS-6094. - node.updateStorage(srdb.getStorage()); + storageInfo = node.updateStorage(srdb.getStorage()); } for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { @@ -2949,14 +2969,13 @@ public void processIncrementalBlockReport(final DatanodeID nodeID, deleted++; break; case RECEIVED_BLOCK: - addBlock(node, srdb.getStorage().getStorageID(), - rdbi.getBlock(), rdbi.getDelHints()); + addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints()); received++; break; case RECEIVING_BLOCK: receiving++; - processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(), - rdbi.getBlock(), ReplicaState.RBW, null); + processAndHandleReportedBlock(storageInfo, rdbi.getBlock(), + ReplicaState.RBW, null); break; default: String msg = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index fa4b0e533bd..791fc3157d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -207,7 +207,7 @@ long getBlockPoolUsed() { return blockPoolUsed; } - boolean addBlock(BlockInfo b) { + public boolean addBlock(BlockInfo b) { if(!b.addStorage(this)) return false; // add to the head of the data-node list diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java index 0a1ba65f125..5f59f0267a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; /** * In the Standby Node, we can receive messages about blocks @@ -41,14 +42,12 @@ class PendingDataNodeMessages { static class ReportedBlockInfo { private final Block block; - private final DatanodeDescriptor dn; - private final String storageID; + private final DatanodeStorageInfo storageInfo; private final ReplicaState reportedState; - ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block, + ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { - this.dn = dn; - this.storageID = storageID; + this.storageInfo = storageInfo; this.block = block; this.reportedState = reportedState; } @@ -57,21 +56,18 @@ Block getBlock() { return block; } - DatanodeDescriptor getNode() { - return dn; - } - - String getStorageID() { - return storageID; - } - ReplicaState getReportedState() { return reportedState; } + + DatanodeStorageInfo getStorageInfo() { + return storageInfo; + } @Override public String toString() { - return "ReportedBlockInfo [block=" + block + ", dn=" + dn + return "ReportedBlockInfo [block=" + block + ", dn=" + + storageInfo.getDatanodeDescriptor() + ", reportedState=" + reportedState + "]"; } } @@ -87,7 +83,7 @@ void removeAllMessagesForDatanode(DatanodeDescriptor dn) { Queue oldQueue = entry.getValue(); while (!oldQueue.isEmpty()) { ReportedBlockInfo rbi = oldQueue.remove(); - if (!rbi.getNode().equals(dn)) { + if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) { newQueue.add(rbi); } else { count--; @@ -97,11 +93,11 @@ void removeAllMessagesForDatanode(DatanodeDescriptor dn) { } } - void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, + void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { block = new Block(block); getBlockQueue(block).add( - new ReportedBlockInfo(dn, storageID, block, reportedState)); + new ReportedBlockInfo(storageInfo, block, reportedState)); count++; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d529cb6caf9..3df62ac171f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4358,8 +4358,11 @@ void commitBlockSynchronization(ExtendedBlock lastblock, // Otherwise fsck will report these blocks as MISSING, especially if the // blocksReceived from Datanodes take a long time to arrive. for (int i = 0; i < trimmedTargets.size(); i++) { - trimmedTargets.get(i).addBlock( - trimmedStorages.get(i), storedBlock); + DatanodeStorageInfo storageInfo = + trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i)); + if (storageInfo != null) { + storageInfo.addBlock(storedBlock); + } } } @@ -5838,7 +5841,7 @@ NamenodeCommand startCheckpoint(NamenodeRegistration backupNode, } public void processIncrementalBlockReport(final DatanodeID nodeID, - final String poolId, final StorageReceivedDeletedBlocks srdb) + final StorageReceivedDeletedBlocks srdb) throws IOException { writeLock(); try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index e451fb5ea58..3ab022b10bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1060,7 +1060,7 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg, // for the same node and storage, so the value returned by the last // call of this loop is the final updated value for noStaleStorage. // - noStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks); + noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks); metrics.incrStorageBlockReportOps(); } @@ -1096,7 +1096,7 @@ public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, +" blocks."); } for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) { - namesystem.processIncrementalBlockReport(nodeReg, poolId, r); + namesystem.processIncrementalBlockReport(nodeReg, r); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index e632ed1ca97..41af2370d14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -368,7 +368,7 @@ private void fulfillPipeline(BlockInfo blockInfo, DatanodeStorageInfo[] pipeline) throws IOException { for (int i = 1; i < pipeline.length; i++) { DatanodeStorageInfo storage = pipeline[i]; - bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null); + bm.addBlock(storage, blockInfo, null); blockInfo.addStorage(storage); } } @@ -549,12 +549,12 @@ public void testSafeModeIBR() throws Exception { // send block report, should be processed reset(node); - bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), new BlockListAsLongs(null, null)); assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); - bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), new BlockListAsLongs(null, null)); assertEquals(1, ds.getBlockReportCount()); @@ -566,7 +566,7 @@ public void testSafeModeIBR() throws Exception { assertEquals(0, ds.getBlockReportCount()); // ready for report again // send block report, should be processed after restart reset(node); - bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), new BlockListAsLongs(null, null)); assertEquals(1, ds.getBlockReportCount()); } @@ -595,7 +595,7 @@ public void testSafeModeIBRAfterIncremental() throws Exception { // send block report while pretending to already have blocks reset(node); doReturn(1).when(node).numBlocks(); - bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), new BlockListAsLongs(null, null)); assertEquals(1, ds.getBlockReportCount()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java index 12674eb318a..2d7eaf3dcfe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java @@ -63,16 +63,16 @@ public void testBlocksCounter() throws Exception { assertTrue(storages.length > 0); final String storageID = storages[0].getStorageID(); // add first block - assertTrue(dd.addBlock(storageID, blk)); + assertTrue(storages[0].addBlock(blk)); assertEquals(1, dd.numBlocks()); // remove a non-existent block assertFalse(dd.removeBlock(blk1)); assertEquals(1, dd.numBlocks()); // add an existent block - assertFalse(dd.addBlock(storageID, blk)); + assertFalse(storages[0].addBlock(blk)); assertEquals(1, dd.numBlocks()); // add second block - assertTrue(dd.addBlock(storageID, blk1)); + assertTrue(storages[0].addBlock(blk1)); assertEquals(2, dd.numBlocks()); // remove first block assertTrue(dd.removeBlock(blk)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java index a17d32e6672..981ae76a10a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.junit.Test; import com.google.common.base.Joiner; @@ -43,8 +44,10 @@ public class TestPendingDataNodeMessages { @Test public void testQueues() { DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor(); - msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED); - msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED); + DatanodeStorage storage = new DatanodeStorage("STORAGE_ID"); + DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage); + msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED); assertEquals(2, msgs.count()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 3db50245f18..7390b5a6f8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -76,7 +76,7 @@ public class TestReplicationPolicy { private static NameNode namenode; private static BlockPlacementPolicy replicator; private static final String filename = "/dummyfile.txt"; - private static DatanodeDescriptor dataNodes[]; + private static DatanodeDescriptor[] dataNodes; private static DatanodeStorageInfo[] storages; // The interval for marking a datanode as stale, private static final long staleInterval =