diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c13a725fb75..3c6054932c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -849,6 +849,9 @@ Trunk (Unreleased) HDFS-8438. Erasure Coding: Allow concat striped files if they have the same ErasureCodingPolicy. (Walter Su via jing9) + HDFS-9275. Wait previous ErasureCodingWork to finish before schedule + another one. (Walter Su via yliu) + Release 2.8.0 - UNRELEASED NEW FEATURES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 897df1ef43d..dbe072605a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1586,6 +1586,10 @@ public class BlockManager implements BlockStatsMXBean { } if (block.isStriped()) { + if (pendingNum > 0) { + // Wait the previous recovery to finish. + return null; + } short[] indices = new short[liveBlockIndices.size()]; for (int i = 0 ; i < liveBlockIndices.size(); i++) { indices[i] = liveBlockIndices.get(i); @@ -1641,6 +1645,7 @@ public class BlockManager implements BlockStatsMXBean { if (block.isStriped()) { assert rw instanceof ErasureCodingWork; assert rw.getTargets().length > 0; + assert pendingNum == 0: "Should wait the previous recovery to finish"; String src = getBlockCollection(block).getName(); ErasureCodingPolicy ecPolicy = null; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index cc6e7d37de3..9942a2d8270 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -61,10 +61,10 @@ public class StripedFileTestUtil { public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024; public static final int BLOCK_STRIPE_SIZE = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS; - static final int stripesPerBlock = 4; - static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock; - static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2; - static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS; + public static final int stripesPerBlock = 4; + public static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock; + public static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2; + public static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS; static byte[] generateBytes(int cnt) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index f1ce8ff0ba8..76b471ad9d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -57,6 +57,8 @@ public class TestDFSStripedOutputStream { int numDNs = dataBlocks + parityBlocks + 2; conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java index f521d8edb18..b5ffb38b0bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithMissingBlocks.java @@ -41,9 +41,9 @@ import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; public class TestReadStripedFileWithMissingBlocks { public static final Log LOG = LogFactory .getLog(TestReadStripedFileWithMissingBlocks.class); - private static MiniDFSCluster cluster; - private static DistributedFileSystem fs; - private static Configuration conf = new HdfsConfiguration(); + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private Configuration conf = new HdfsConfiguration(); private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS; private final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; private final int fileLength = blockSize * dataBlocks + 123; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java index ec7594fc524..5b9245b1f6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.BitSet; @@ -41,11 +39,9 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; -import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; @@ -80,9 +76,12 @@ public class TestRecoverStripedFile { public void setup() throws IOException { conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); - conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, cellSize - 1); + conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, + cellSize - 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();; + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build(); cluster.waitActive(); fs = cluster.getFileSystem(); @@ -251,82 +250,56 @@ public class TestRecoverStripedFile { lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]])); assertTrue(metadatas[i].getName(). endsWith(blocks[i].getGenerationStamp() + ".meta")); - replicaContents[i] = readReplica(replicas[i]); + replicaContents[i] = DFSTestUtil.readFileAsBytes(replicas[i]); } int cellsNum = (fileLen - 1) / cellSize + 1; int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum; - try { - DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum]; - for (int i = 0; i < toRecoverBlockNum; i++) { - /* - * Kill the datanode which contains one replica - * We need to make sure it dead in namenode: clear its update time and - * trigger NN to check heartbeat. - */ - DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]); - dn.shutdown(); - dnIDs[i] = dn.getDatanodeId(); - } - setDataNodesDead(dnIDs); - - // Check the locatedBlocks of the file again - locatedBlocks = getLocatedBlocks(file); - lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); - storageInfos = lastBlock.getLocations(); - assertEquals(storageInfos.length, groupSize - toRecoverBlockNum); - - int[] targetDNs = new int[dnNum - groupSize]; - n = 0; - for (int i = 0; i < dnNum; i++) { - if (!bitset.get(i)) { // not contain replica of the block. - targetDNs[n++] = i; - } - } - - waitForRecoveryFinished(file, groupSize); - - targetDNs = sortTargetsByReplicas(blocks, targetDNs); - - // Check the replica on the new target node. - for (int i = 0; i < toRecoverBlockNum; i++) { - File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]); - File metadataAfterRecovery = - cluster.getBlockMetadataFile(targetDNs[i], blocks[i]); - assertEquals(replicaAfterRecovery.length(), replicas[i].length()); - assertTrue(metadataAfterRecovery.getName(). - endsWith(blocks[i].getGenerationStamp() + ".meta")); - byte[] replicaContentAfterRecovery = readReplica(replicaAfterRecovery); - - Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery); - } - } finally { - for (int i = 0; i < toRecoverBlockNum; i++) { - restartDataNode(toDead[i]); - } - cluster.waitActive(); + for (int i = 0; i < toRecoverBlockNum; i++) { + /* + * Kill the datanode which contains one replica + * We need to make sure it dead in namenode: clear its update time and + * trigger NN to check heartbeat. + */ + DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]); + dn.shutdown(); + cluster.setDataNodeDead(dn.getDatanodeId()); } - fs.delete(file, true); - } - - private void setDataNodesDead(DatanodeID[] dnIDs) throws IOException { - for (DatanodeID dn : dnIDs) { - DatanodeDescriptor dnd = - NameNodeAdapter.getDatanode(cluster.getNamesystem(), dn); - DFSTestUtil.setDatanodeDead(dnd); + + // Check the locatedBlocks of the file again + locatedBlocks = getLocatedBlocks(file); + lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + storageInfos = lastBlock.getLocations(); + assertEquals(storageInfos.length, groupSize - toRecoverBlockNum); + + int[] targetDNs = new int[dnNum - groupSize]; + n = 0; + for (int i = 0; i < dnNum; i++) { + if (!bitset.get(i)) { // not contain replica of the block. + targetDNs[n++] = i; + } } - BlockManagerTestUtil.checkHeartbeat(cluster.getNamesystem().getBlockManager()); - } - - private void restartDataNode(int dn) { - try { - cluster.restartDataNode(dn, true, true); - } catch (IOException e) { + waitForRecoveryFinished(file, groupSize); + + targetDNs = sortTargetsByReplicas(blocks, targetDNs); + + // Check the replica on the new target node. + for (int i = 0; i < toRecoverBlockNum; i++) { + File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]); + File metadataAfterRecovery = + cluster.getBlockMetadataFile(targetDNs[i], blocks[i]); + assertEquals(replicaAfterRecovery.length(), replicas[i].length()); + assertTrue(metadataAfterRecovery.getName(). + endsWith(blocks[i].getGenerationStamp() + ".meta")); + byte[] replicaContentAfterRecovery = + DFSTestUtil.readFileAsBytes(replicaAfterRecovery); + + Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery); } } - + private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) { int[] result = new int[blocks.length]; for (int i = 0; i < blocks.length; i++) { @@ -347,31 +320,7 @@ public class TestRecoverStripedFile { } return result; } - - private byte[] readReplica(File replica) throws IOException { - int length = (int)replica.length(); - ByteArrayOutputStream content = new ByteArrayOutputStream(length); - FileInputStream in = new FileInputStream(replica); - try { - byte[] buffer = new byte[1024]; - int total = 0; - while (total < length) { - int n = in.read(buffer); - if (n <= 0) { - break; - } - content.write(buffer, 0, n); - total += n; - } - if (total < length) { - Assert.fail("Failed to read all content of replica"); - } - return content.toByteArray(); - } finally { - in.close(); - } - } - + private LocatedBlocks waitForRecoveryFinished(Path file, int groupSize) throws Exception { final int ATTEMPTS = 60; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java index 9853b8a59fd..6d711d1f333 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java @@ -46,8 +46,8 @@ public class TestSafeModeWithStripedFile { static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; static final int blockSize = cellSize * 2; - static MiniDFSCluster cluster; - static Configuration conf; + private MiniDFSCluster cluster; + private Configuration conf; @Before public void setup() throws IOException { @@ -57,7 +57,6 @@ public class TestSafeModeWithStripedFile { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); cluster.waitActive(); - } @After diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java index 4beb01ff8fa..3ea6eea237f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -47,11 +47,11 @@ import static org.apache.hadoop.hdfs.StripedFileTestUtil.stripesPerBlock; public class TestWriteReadStripedFile { public static final Log LOG = LogFactory.getLog(TestWriteReadStripedFile.class); - private static MiniDFSCluster cluster; - private static DistributedFileSystem fs; private static int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; private static short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS; - private static Configuration conf = new HdfsConfiguration(); + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private Configuration conf = new HdfsConfiguration(); static { GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); @@ -64,6 +64,8 @@ public class TestWriteReadStripedFile { @Before public void setup() throws IOException { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); fs = cluster.getFileSystem(); fs.mkdirs(new Path("/ec")); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java index 764527d9074..6dcff69d882 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java @@ -38,9 +38,9 @@ import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; public class TestWriteStripedFileWithFailure { public static final Log LOG = LogFactory .getLog(TestWriteStripedFileWithFailure.class); - private static MiniDFSCluster cluster; - private static FileSystem fs; - private static Configuration conf = new HdfsConfiguration(); + private MiniDFSCluster cluster; + private FileSystem fs; + private Configuration conf = new HdfsConfiguration(); static { GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java index ae33ffe3401..a9b2aaab1a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java @@ -36,8 +36,8 @@ public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS { private final static int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; private final static int stripesPerBlock = 4; private final static int numDNs = dataBlocks + parityBlocks + 2; - private static MiniDFSCluster cluster; - private static Configuration conf; + private MiniDFSCluster cluster; + private Configuration conf; { BLOCK_SIZE = cellSize * stripesPerBlock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java index 6774aedba33..101601e92d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java @@ -21,30 +21,41 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; + +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.junit.Test; import java.util.List; import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS; import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class TestRecoverStripedBlocks { + private static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; private final short GROUP_SIZE = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS); + private MiniDFSCluster cluster; private final Path dirPath = new Path("/dir"); private Path filePath = new Path(dirPath, "file"); @@ -166,4 +177,63 @@ public class TestRecoverStripedBlocks { cluster.shutdown(); } } + + @Test + public void test2RecoveryTasksForSameBlockGroup() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 2) + .build(); + try { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + BlockManager bm = cluster.getNamesystem().getBlockManager(); + fs.getClient().setErasureCodingPolicy("/", null); + int fileLen = NUM_DATA_BLOCKS * blockSize; + Path p = new Path("/test2RecoveryTasksForSameBlockGroup"); + final byte[] data = new byte[fileLen]; + DFSTestUtil.writeFile(fs, p, data); + + LocatedStripedBlock lb = (LocatedStripedBlock)fs.getClient() + .getLocatedBlocks(p.toString(), 0).get(0); + LocatedBlock[] lbs = StripedBlockUtil.parseStripedBlockGroup(lb, + cellSize, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + + assertEquals(0, getNumberOfBlocksToBeErasureCoded(cluster)); + assertEquals(0, bm.getPendingReplicationBlocksCount()); + + // missing 1 block, so 1 task should be scheduled + DatanodeInfo dn0 = lbs[0].getLocations()[0]; + cluster.stopDataNode(dn0.getName()); + cluster.setDataNodeDead(dn0); + BlockManagerTestUtil.getComputedDatanodeWork(bm); + assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster)); + assertEquals(1, bm.getPendingReplicationBlocksCount()); + + // missing another block, but no new task should be scheduled because + // previous task isn't finished. + DatanodeInfo dn1 = lbs[1].getLocations()[0]; + cluster.stopDataNode(dn1.getName()); + cluster.setDataNodeDead(dn1); + BlockManagerTestUtil.getComputedDatanodeWork(bm); + assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster)); + assertEquals(1, bm.getPendingReplicationBlocksCount()); + } finally { + cluster.shutdown(); + } + } + + private static int getNumberOfBlocksToBeErasureCoded(MiniDFSCluster cluster) + throws Exception { + DatanodeManager dm = + cluster.getNamesystem().getBlockManager().getDatanodeManager(); + int count = 0; + for( DataNode dn : cluster.getDataNodes()){ + DatanodeDescriptor dd = dm.getDatanode(dn.getDatanodeId()); + count += dd.getNumberOfBlocksToBeErasureCoded(); + } + return count; + } }