diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e439d546ed3..b1e533f182b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -249,6 +249,9 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9760. Move GSet and related classes to common from HDFS. (suresh) + HDFS-5020. Make DatanodeProtocol#blockReceivedAndDeleted idempotent. + (jing9) + OPTIMIZATIONS HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 92138d91b34..98fc9427c30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1336,7 +1336,7 @@ public class BlockManager { // Move the block-replication into a "pending" state. // The reason we use 'pending' is so we can retry // replications that fail after an appropriate amount of time. - pendingReplications.increment(block, targets.length); + pendingReplications.increment(block, targets); if(blockLog.isDebugEnabled()) { blockLog.debug( "BLOCK* block " + block @@ -2622,6 +2622,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block void addBlock(DatanodeDescriptor node, Block block, String delHint) throws IOException { // decrement number of blocks scheduled to this datanode. + // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with + // RECEIVED_BLOCK), we currently also decrease the approximate number. node.decBlocksScheduled(); // get the deletion hint node @@ -2637,7 +2639,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block // // Modify the blocks->datanode map and node's map. // - pendingReplications.decrement(block); + pendingReplications.decrement(block, node); processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED, delHintNode); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java index 048968d12af..6b07b789341 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java @@ -22,8 +22,10 @@ import static org.apache.hadoop.util.Time.now; import java.io.PrintWriter; import java.sql.Time; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -71,14 +73,16 @@ class PendingReplicationBlocks { /** * Add a block to the list of pending Replications + * @param block The corresponding block + * @param targets The DataNodes where replicas of the block should be placed */ - void increment(Block block, int numReplicas) { + void increment(Block block, DatanodeDescriptor[] targets) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found == null) { - pendingReplications.put(block, new PendingBlockInfo(numReplicas)); + pendingReplications.put(block, new PendingBlockInfo(targets)); } else { - found.incrementReplicas(numReplicas); + found.incrementReplicas(targets); found.setTimeStamp(); } } @@ -88,15 +92,17 @@ class PendingReplicationBlocks { * One replication request for this block has finished. * Decrement the number of pending replication requests * for this block. + * + * @param The DataNode that finishes the replication */ - void decrement(Block block) { + void decrement(Block block, DatanodeDescriptor dn) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found != null) { if(LOG.isDebugEnabled()) { LOG.debug("Removing pending replication for " + block); } - found.decrementReplicas(); + found.decrementReplicas(dn); if (found.getNumReplicas() <= 0) { pendingReplications.remove(block); } @@ -153,7 +159,7 @@ class PendingReplicationBlocks { return null; } Block[] blockList = timedOutItems.toArray( - new Block[timedOutItems.size()]); + new Block[timedOutItems.size()]); timedOutItems.clear(); return blockList; } @@ -163,16 +169,17 @@ class PendingReplicationBlocks { * An object that contains information about a block that * is being replicated. It records the timestamp when the * system started replicating the most recent copy of this - * block. It also records the number of replication - * requests that are in progress. + * block. It also records the list of Datanodes where the + * replication requests are in progress. */ static class PendingBlockInfo { private long timeStamp; - private int numReplicasInProgress; + private final List targets; - PendingBlockInfo(int numReplicas) { + PendingBlockInfo(DatanodeDescriptor[] targets) { this.timeStamp = now(); - this.numReplicasInProgress = numReplicas; + this.targets = targets == null ? new ArrayList() + : new ArrayList(Arrays.asList(targets)); } long getTimeStamp() { @@ -183,17 +190,20 @@ class PendingReplicationBlocks { timeStamp = now(); } - void incrementReplicas(int increment) { - numReplicasInProgress += increment; + void incrementReplicas(DatanodeDescriptor... newTargets) { + if (newTargets != null) { + for (DatanodeDescriptor dn : newTargets) { + targets.add(dn); + } + } } - void decrementReplicas() { - numReplicasInProgress--; - assert(numReplicasInProgress >= 0); + void decrementReplicas(DatanodeDescriptor dn) { + targets.remove(dn); } int getNumReplicas() { - return numReplicasInProgress; + return targets.size(); } } @@ -274,7 +284,7 @@ class PendingReplicationBlocks { out.println(block + " StartTime: " + new Time(pendingBlock.timeStamp) + " NumReplicaInProgress: " + - pendingBlock.numReplicasInProgress); + pendingBlock.getNumReplicas()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index dd917cd4430..65225e478d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -982,7 +982,8 @@ public class DataNode extends Configured * @return BP registration object * @throws IOException */ - DatanodeRegistration getDNRegistrationForBP(String bpid) + @VisibleForTesting + public DatanodeRegistration getDNRegistrationForBP(String bpid) throws IOException { BPOfferService bpos = blockPoolManager.get(bpid); if(bpos==null || bpos.bpRegistration==null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 988590b5af4..8a500f4db61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -138,7 +138,7 @@ public interface DatanodeProtocol { * writes a new Block here, or another DataNode copies a Block to * this DataNode, it will call blockReceived(). */ - @AtMostOnce + @Idempotent public void blockReceivedAndDeleted(DatanodeRegistration registration, String poolId, StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java index b83e371fe13..0c1fee98646 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -28,13 +30,21 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.junit.Test; +import com.google.common.base.Preconditions; + /** * This class tests the internals of PendingReplicationBlocks.java, * as well as how PendingReplicationBlocks acts in BlockManager @@ -44,7 +54,22 @@ public class TestPendingReplication { private static final int DFS_REPLICATION_INTERVAL = 1; // Number of datanodes in the cluster private static final int DATANODE_COUNT = 5; + + private DatanodeDescriptor genDatanodeId(int seed) { + seed = seed % 256; + String ip = seed + "." + seed + "." + seed + "." + seed; + return DFSTestUtil.getDatanodeDescriptor(ip, null); + } + private DatanodeDescriptor[] genDatanodes(int number) { + Preconditions.checkArgument(number >= 0); + DatanodeDescriptor[] nodes = new DatanodeDescriptor[number]; + for (int i = 0; i < number; i++) { + nodes[i] = genDatanodeId(i); + } + return nodes; + } + @Test public void testPendingReplication() { PendingReplicationBlocks pendingReplications; @@ -56,7 +81,7 @@ public class TestPendingReplication { // for (int i = 0; i < 10; i++) { Block block = new Block(i, i, 0); - pendingReplications.increment(block, i); + pendingReplications.increment(block, genDatanodes(i)); } assertEquals("Size of pendingReplications ", 10, pendingReplications.size()); @@ -66,15 +91,16 @@ public class TestPendingReplication { // remove one item and reinsert it // Block blk = new Block(8, 8, 0); - pendingReplications.decrement(blk); // removes one replica + pendingReplications.decrement(blk, genDatanodeId(7)); // removes one replica assertEquals("pendingReplications.getNumReplicas ", 7, pendingReplications.getNumReplicas(blk)); for (int i = 0; i < 7; i++) { - pendingReplications.decrement(blk); // removes all replicas + // removes all replicas + pendingReplications.decrement(blk, genDatanodeId(i)); } assertTrue(pendingReplications.size() == 9); - pendingReplications.increment(blk, 8); + pendingReplications.increment(blk, genDatanodes(8)); assertTrue(pendingReplications.size() == 10); // @@ -102,7 +128,7 @@ public class TestPendingReplication { for (int i = 10; i < 15; i++) { Block block = new Block(i, i, 0); - pendingReplications.increment(block, i); + pendingReplications.increment(block, genDatanodes(i)); } assertTrue(pendingReplications.size() == 15); @@ -133,6 +159,101 @@ public class TestPendingReplication { pendingReplications.stop(); } + /** + * Test if DatanodeProtocol#blockReceivedAndDeleted can correctly update the + * pending replications. Also make sure the blockReceivedAndDeleted call is + * idempotent to the pending replications. + */ + @Test + public void testBlockReceived() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + DATANODE_COUNT).build(); + cluster.waitActive(); + + DistributedFileSystem hdfs = cluster.getFileSystem(); + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager blkManager = fsn.getBlockManager(); + + final String file = "/tmp.txt"; + final Path filePath = new Path(file); + short replFactor = 1; + DFSTestUtil.createFile(hdfs, filePath, 1024L, replFactor, 0); + + // temporarily stop the heartbeat + ArrayList datanodes = cluster.getDataNodes(); + for (int i = 0; i < DATANODE_COUNT; i++) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(datanodes.get(i), true); + } + + hdfs.setReplication(filePath, (short) DATANODE_COUNT); + BlockManagerTestUtil.computeAllPendingWork(blkManager); + + assertEquals(1, blkManager.pendingReplications.size()); + INodeFile fileNode = fsn.getFSDirectory().getINode4Write(file).asFile(); + Block[] blocks = fileNode.getBlocks(); + assertEquals(DATANODE_COUNT - 1, + blkManager.pendingReplications.getNumReplicas(blocks[0])); + + LocatedBlock locatedBlock = hdfs.getClient().getLocatedBlocks(file, 0) + .get(0); + DatanodeInfo existingDn = (locatedBlock.getLocations())[0]; + int reportDnNum = 0; + String poolId = cluster.getNamesystem().getBlockPoolId(); + // let two datanodes (other than the one that already has the data) to + // report to NN + for (int i = 0; i < DATANODE_COUNT && reportDnNum < 2; i++) { + if (!datanodes.get(i).getDatanodeId().equals(existingDn)) { + DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP( + poolId); + StorageReceivedDeletedBlocks[] report = { + new StorageReceivedDeletedBlocks(dnR.getStorageID(), + new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo( + blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) }; + cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report); + reportDnNum++; + } + } + + assertEquals(DATANODE_COUNT - 3, + blkManager.pendingReplications.getNumReplicas(blocks[0])); + + // let the same datanodes report again + for (int i = 0; i < DATANODE_COUNT && reportDnNum < 2; i++) { + if (!datanodes.get(i).getDatanodeId().equals(existingDn)) { + DatanodeRegistration dnR = datanodes.get(i).getDNRegistrationForBP( + poolId); + StorageReceivedDeletedBlocks[] report = + { new StorageReceivedDeletedBlocks(dnR.getStorageID(), + new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo( + blocks[0], BlockStatus.RECEIVED_BLOCK, "") }) }; + cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, report); + reportDnNum++; + } + } + + assertEquals(DATANODE_COUNT - 3, + blkManager.pendingReplications.getNumReplicas(blocks[0])); + + // re-enable heartbeat for the datanode that has data + for (int i = 0; i < DATANODE_COUNT; i++) { + DataNodeTestUtils + .setHeartbeatsDisabledForTests(datanodes.get(i), false); + DataNodeTestUtils.triggerHeartbeat(datanodes.get(i)); + } + + Thread.sleep(5000); + assertEquals(0, blkManager.pendingReplications.size()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** * Test if BlockManager can correctly remove corresponding pending records * when a file is deleted