diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d34f4664c15..2ab56720364 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2,6 +2,11 @@ Hadoop HDFS Change Log Release 0.23-PB - Unreleased + NEW FEATURES + + HDFS-395. DFS Scalability: Incremental block reports. (Tomasz Nykiel + via hairong) + IMPROVEMENTS HDFS-2018. Move all journal stream management code into one place. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 84a866f3d05..250781320c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -59,10 +59,12 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.Daemon; @@ -2006,7 +2008,7 @@ public class BlockManager { * Modify (block-->datanode) map. Possibly generate replication tasks, if the * removed block is still valid. */ - private void removeStoredBlock(Block block, DatanodeDescriptor node) { + public void removeStoredBlock(Block block, DatanodeDescriptor node) { if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: " + block + " from " + node.getName()); @@ -2125,27 +2127,48 @@ public class BlockManager { } } - /** The given node is reporting that it received a certain block. */ - public void blockReceived(final DatanodeID nodeID, final String poolId, - final Block block, final String delHint) throws IOException { + /** The given node is reporting that it received/deleted certain blocks. */ + public void blockReceivedAndDeleted(final DatanodeID nodeID, + final String poolId, + final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[] + ) throws IOException { namesystem.writeLock(); + int received = 0; + int deleted = 0; try { final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID); if (node == null || !node.isAlive) { - final String s = block + " is received from dead or unregistered node " - + nodeID.getName(); - NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s); - throw new IOException(s); - } - - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block - + " is received from " + nodeID.getName()); + NameNode.stateChangeLog + .warn("BLOCK* blockReceivedDeleted" + + " is received from dead or unregistered node " + + nodeID.getName()); + throw new IOException( + "Got blockReceivedDeleted message from unregistered or dead node"); } - addBlock(node, block, delHint); + for (int i = 0; i < receivedAndDeletedBlocks.length; i++) { + if (receivedAndDeletedBlocks[i].isDeletedBlock()) { + removeStoredBlock( + receivedAndDeletedBlocks[i].getBlock(), node); + deleted++; + } else { + addBlock(node, receivedAndDeletedBlocks[i].getBlock(), + receivedAndDeletedBlocks[i].getDelHints()); + received++; + } + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("BLOCK* block" + + (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted" + : "Received") + ": " + receivedAndDeletedBlocks[i].getBlock() + + " is received from " + nodeID.getName()); + } + } } finally { namesystem.writeUnlock(); + NameNode.stateChangeLog + .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from " + + nodeID.getName() + " received: " + received + ", " + + " deleted: " + deleted); } } @@ -2320,6 +2343,7 @@ public class BlockManager { } public void removeBlock(Block block) { + block.setNumBytes(BlockCommand.NO_ACK); addToInvalidates(block); corruptReplicas.removeFromCorruptReplicasMap(block); blocksMap.removeBlock(block); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index a58de18f04c..ba5226f2e24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -87,6 +88,7 @@ class BPOfferService implements Runnable { DatanodeRegistration bpRegistration; long lastBlockReport = 0; + long lastDeletedReport = 0; boolean resetBlockReportTime = true; @@ -94,8 +96,9 @@ class BPOfferService implements Runnable { DatanodeProtocol bpNamenode; private long lastHeartbeat = 0; private volatile boolean initialized = false; - private final LinkedList receivedBlockList = new LinkedList(); - private final LinkedList delHints = new LinkedList(); + private final LinkedList receivedAndDeletedBlockList + = new LinkedList(); + private volatile int pendingReceivedRequests = 0; private volatile boolean shouldServiceRun = true; UpgradeManagerDatanode upgradeManager = null; private final DataNode dn; @@ -271,39 +274,30 @@ class BPOfferService implements Runnable { * Report received blocks and delete hints to the Namenode * @throws IOException */ - private void reportReceivedBlocks() throws IOException { - //check if there are newly received blocks - Block [] blockArray=null; - String [] delHintArray=null; - synchronized(receivedBlockList) { - synchronized(delHints){ - int numBlocks = receivedBlockList.size(); - if (numBlocks > 0) { - if(numBlocks!=delHints.size()) { - LOG.warn("Panic: receiveBlockList and delHints are not of " + - "the same length" ); - } - // - // Send newly-received blockids to namenode - // - blockArray = receivedBlockList.toArray(new Block[numBlocks]); - delHintArray = delHints.toArray(new String[numBlocks]); - } + private void reportReceivedDeletedBlocks() throws IOException { + + // check if there are newly received blocks + ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null; + int currentReceivedRequestsCounter; + synchronized (receivedAndDeletedBlockList) { + currentReceivedRequestsCounter = pendingReceivedRequests; + int numBlocks = receivedAndDeletedBlockList.size(); + if (numBlocks > 0) { + // + // Send newly-received and deleted blockids to namenode + // + receivedAndDeletedBlockArray = receivedAndDeletedBlockList + .toArray(new ReceivedDeletedBlockInfo[numBlocks]); } } - if (blockArray != null) { - if(delHintArray == null || delHintArray.length != blockArray.length ) { - LOG.warn("Panic: block array & delHintArray are not the same" ); - } - bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray, - delHintArray); - synchronized(receivedBlockList) { - synchronized(delHints){ - for(int i=0; i