HDFS-9275. Wait previous ErasureCodingWork to finish before schedule another one. (Walter Su via yliu)

This commit is contained in:
yliu 2015-11-03 09:14:32 +08:00
parent 7632409482
commit 5ba2b98d0f
11 changed files with 145 additions and 115 deletions

View File

@ -849,6 +849,9 @@ Trunk (Unreleased)
HDFS-8438. Erasure Coding: Allow concat striped files if they have the same HDFS-8438. Erasure Coding: Allow concat striped files if they have the same
ErasureCodingPolicy. (Walter Su via jing9) ErasureCodingPolicy. (Walter Su via jing9)
HDFS-9275. Wait previous ErasureCodingWork to finish before schedule
another one. (Walter Su via yliu)
Release 2.8.0 - UNRELEASED Release 2.8.0 - UNRELEASED
NEW FEATURES NEW FEATURES

View File

@ -1586,6 +1586,10 @@ private BlockRecoveryWork scheduleRecovery(BlockInfo block, int priority) {
} }
if (block.isStriped()) { if (block.isStriped()) {
if (pendingNum > 0) {
// Wait the previous recovery to finish.
return null;
}
short[] indices = new short[liveBlockIndices.size()]; short[] indices = new short[liveBlockIndices.size()];
for (int i = 0 ; i < liveBlockIndices.size(); i++) { for (int i = 0 ; i < liveBlockIndices.size(); i++) {
indices[i] = liveBlockIndices.get(i); indices[i] = liveBlockIndices.get(i);
@ -1641,6 +1645,7 @@ private boolean validateRecoveryWork(BlockRecoveryWork rw) {
if (block.isStriped()) { if (block.isStriped()) {
assert rw instanceof ErasureCodingWork; assert rw instanceof ErasureCodingWork;
assert rw.getTargets().length > 0; assert rw.getTargets().length > 0;
assert pendingNum == 0: "Should wait the previous recovery to finish";
String src = getBlockCollection(block).getName(); String src = getBlockCollection(block).getName();
ErasureCodingPolicy ecPolicy = null; ErasureCodingPolicy ecPolicy = null;
try { try {

View File

@ -61,10 +61,10 @@ public class StripedFileTestUtil {
public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024; public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024;
public static final int BLOCK_STRIPE_SIZE = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS; public static final int BLOCK_STRIPE_SIZE = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS;
static final int stripesPerBlock = 4; public static final int stripesPerBlock = 4;
static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock; public static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2; public static final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS; public static final int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS;
static byte[] generateBytes(int cnt) { static byte[] generateBytes(int cnt) {

View File

@ -57,6 +57,8 @@ public void setup() throws IOException {
int numDNs = dataBlocks + parityBlocks + 2; int numDNs = dataBlocks + parityBlocks + 2;
conf = new Configuration(); conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);

View File

@ -41,9 +41,9 @@
public class TestReadStripedFileWithMissingBlocks { public class TestReadStripedFileWithMissingBlocks {
public static final Log LOG = LogFactory public static final Log LOG = LogFactory
.getLog(TestReadStripedFileWithMissingBlocks.class); .getLog(TestReadStripedFileWithMissingBlocks.class);
private static MiniDFSCluster cluster; private MiniDFSCluster cluster;
private static DistributedFileSystem fs; private DistributedFileSystem fs;
private static Configuration conf = new HdfsConfiguration(); private Configuration conf = new HdfsConfiguration();
private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS; private final short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
private final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; private final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
private final int fileLength = blockSize * dataBlocks + 123; private final int fileLength = blockSize * dataBlocks + 123;

View File

@ -20,9 +20,7 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.BitSet; import java.util.BitSet;
@ -41,11 +39,9 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
@ -80,9 +76,12 @@ public class TestRecoverStripedFile {
public void setup() throws IOException { public void setup() throws IOException {
conf = new Configuration(); conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, cellSize - 1); conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
cellSize - 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();; conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();
cluster.waitActive(); cluster.waitActive();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
@ -251,82 +250,56 @@ private void assertFileBlocksRecovery(String fileName, int fileLen,
lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]])); lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]]));
assertTrue(metadatas[i].getName(). assertTrue(metadatas[i].getName().
endsWith(blocks[i].getGenerationStamp() + ".meta")); endsWith(blocks[i].getGenerationStamp() + ".meta"));
replicaContents[i] = readReplica(replicas[i]); replicaContents[i] = DFSTestUtil.readFileAsBytes(replicas[i]);
} }
int cellsNum = (fileLen - 1) / cellSize + 1; int cellsNum = (fileLen - 1) / cellSize + 1;
int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum; int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum;
try { for (int i = 0; i < toRecoverBlockNum; i++) {
DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum]; /*
for (int i = 0; i < toRecoverBlockNum; i++) { * Kill the datanode which contains one replica
/* * We need to make sure it dead in namenode: clear its update time and
* Kill the datanode which contains one replica * trigger NN to check heartbeat.
* We need to make sure it dead in namenode: clear its update time and */
* trigger NN to check heartbeat. DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]);
*/ dn.shutdown();
DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]); cluster.setDataNodeDead(dn.getDatanodeId());
dn.shutdown();
dnIDs[i] = dn.getDatanodeId();
}
setDataNodesDead(dnIDs);
// Check the locatedBlocks of the file again
locatedBlocks = getLocatedBlocks(file);
lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
storageInfos = lastBlock.getLocations();
assertEquals(storageInfos.length, groupSize - toRecoverBlockNum);
int[] targetDNs = new int[dnNum - groupSize];
n = 0;
for (int i = 0; i < dnNum; i++) {
if (!bitset.get(i)) { // not contain replica of the block.
targetDNs[n++] = i;
}
}
waitForRecoveryFinished(file, groupSize);
targetDNs = sortTargetsByReplicas(blocks, targetDNs);
// Check the replica on the new target node.
for (int i = 0; i < toRecoverBlockNum; i++) {
File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]);
File metadataAfterRecovery =
cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
assertEquals(replicaAfterRecovery.length(), replicas[i].length());
assertTrue(metadataAfterRecovery.getName().
endsWith(blocks[i].getGenerationStamp() + ".meta"));
byte[] replicaContentAfterRecovery = readReplica(replicaAfterRecovery);
Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery);
}
} finally {
for (int i = 0; i < toRecoverBlockNum; i++) {
restartDataNode(toDead[i]);
}
cluster.waitActive();
} }
fs.delete(file, true);
} // Check the locatedBlocks of the file again
locatedBlocks = getLocatedBlocks(file);
private void setDataNodesDead(DatanodeID[] dnIDs) throws IOException { lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
for (DatanodeID dn : dnIDs) { storageInfos = lastBlock.getLocations();
DatanodeDescriptor dnd = assertEquals(storageInfos.length, groupSize - toRecoverBlockNum);
NameNodeAdapter.getDatanode(cluster.getNamesystem(), dn);
DFSTestUtil.setDatanodeDead(dnd); int[] targetDNs = new int[dnNum - groupSize];
n = 0;
for (int i = 0; i < dnNum; i++) {
if (!bitset.get(i)) { // not contain replica of the block.
targetDNs[n++] = i;
}
} }
BlockManagerTestUtil.checkHeartbeat(cluster.getNamesystem().getBlockManager()); waitForRecoveryFinished(file, groupSize);
}
targetDNs = sortTargetsByReplicas(blocks, targetDNs);
private void restartDataNode(int dn) {
try { // Check the replica on the new target node.
cluster.restartDataNode(dn, true, true); for (int i = 0; i < toRecoverBlockNum; i++) {
} catch (IOException e) { File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]);
File metadataAfterRecovery =
cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
assertEquals(replicaAfterRecovery.length(), replicas[i].length());
assertTrue(metadataAfterRecovery.getName().
endsWith(blocks[i].getGenerationStamp() + ".meta"));
byte[] replicaContentAfterRecovery =
DFSTestUtil.readFileAsBytes(replicaAfterRecovery);
Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery);
} }
} }
private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) { private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) {
int[] result = new int[blocks.length]; int[] result = new int[blocks.length];
for (int i = 0; i < blocks.length; i++) { for (int i = 0; i < blocks.length; i++) {
@ -347,31 +320,7 @@ private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) {
} }
return result; return result;
} }
private byte[] readReplica(File replica) throws IOException {
int length = (int)replica.length();
ByteArrayOutputStream content = new ByteArrayOutputStream(length);
FileInputStream in = new FileInputStream(replica);
try {
byte[] buffer = new byte[1024];
int total = 0;
while (total < length) {
int n = in.read(buffer);
if (n <= 0) {
break;
}
content.write(buffer, 0, n);
total += n;
}
if (total < length) {
Assert.fail("Failed to read all content of replica");
}
return content.toByteArray();
} finally {
in.close();
}
}
private LocatedBlocks waitForRecoveryFinished(Path file, int groupSize) private LocatedBlocks waitForRecoveryFinished(Path file, int groupSize)
throws Exception { throws Exception {
final int ATTEMPTS = 60; final int ATTEMPTS = 60;

View File

@ -46,8 +46,8 @@ public class TestSafeModeWithStripedFile {
static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
static final int blockSize = cellSize * 2; static final int blockSize = cellSize * 2;
static MiniDFSCluster cluster; private MiniDFSCluster cluster;
static Configuration conf; private Configuration conf;
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
@ -57,7 +57,6 @@ public void setup() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
cluster.waitActive(); cluster.waitActive();
} }
@After @After

View File

@ -47,11 +47,11 @@
public class TestWriteReadStripedFile { public class TestWriteReadStripedFile {
public static final Log LOG = LogFactory.getLog(TestWriteReadStripedFile.class); public static final Log LOG = LogFactory.getLog(TestWriteReadStripedFile.class);
private static MiniDFSCluster cluster;
private static DistributedFileSystem fs;
private static int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; private static int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
private static short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS; private static short dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
private static Configuration conf = new HdfsConfiguration(); private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private Configuration conf = new HdfsConfiguration();
static { static {
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
@ -64,6 +64,8 @@ public class TestWriteReadStripedFile {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
fs.mkdirs(new Path("/ec")); fs.mkdirs(new Path("/ec"));

View File

@ -38,9 +38,9 @@
public class TestWriteStripedFileWithFailure { public class TestWriteStripedFileWithFailure {
public static final Log LOG = LogFactory public static final Log LOG = LogFactory
.getLog(TestWriteStripedFileWithFailure.class); .getLog(TestWriteStripedFileWithFailure.class);
private static MiniDFSCluster cluster; private MiniDFSCluster cluster;
private static FileSystem fs; private FileSystem fs;
private static Configuration conf = new HdfsConfiguration(); private Configuration conf = new HdfsConfiguration();
static { static {
GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);

View File

@ -36,8 +36,8 @@ public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS {
private final static int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; private final static int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
private final static int stripesPerBlock = 4; private final static int stripesPerBlock = 4;
private final static int numDNs = dataBlocks + parityBlocks + 2; private final static int numDNs = dataBlocks + parityBlocks + 2;
private static MiniDFSCluster cluster; private MiniDFSCluster cluster;
private static Configuration conf; private Configuration conf;
{ {
BLOCK_SIZE = cellSize * stripesPerBlock; BLOCK_SIZE = cellSize * stripesPerBlock;

View File

@ -21,30 +21,41 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.Test; import org.junit.Test;
import java.util.List; import java.util.List;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS; import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS; import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS;
import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class TestRecoverStripedBlocks { public class TestRecoverStripedBlocks {
private static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
private final short GROUP_SIZE = private final short GROUP_SIZE =
(short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS); (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS);
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private final Path dirPath = new Path("/dir"); private final Path dirPath = new Path("/dir");
private Path filePath = new Path(dirPath, "file"); private Path filePath = new Path(dirPath, "file");
@ -166,4 +177,63 @@ private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy)
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test
public void test2RecoveryTasksForSameBlockGroup() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 2)
.build();
try {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
BlockManager bm = cluster.getNamesystem().getBlockManager();
fs.getClient().setErasureCodingPolicy("/", null);
int fileLen = NUM_DATA_BLOCKS * blockSize;
Path p = new Path("/test2RecoveryTasksForSameBlockGroup");
final byte[] data = new byte[fileLen];
DFSTestUtil.writeFile(fs, p, data);
LocatedStripedBlock lb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(p.toString(), 0).get(0);
LocatedBlock[] lbs = StripedBlockUtil.parseStripedBlockGroup(lb,
cellSize, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
assertEquals(0, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(0, bm.getPendingReplicationBlocksCount());
// missing 1 block, so 1 task should be scheduled
DatanodeInfo dn0 = lbs[0].getLocations()[0];
cluster.stopDataNode(dn0.getName());
cluster.setDataNodeDead(dn0);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(1, bm.getPendingReplicationBlocksCount());
// missing another block, but no new task should be scheduled because
// previous task isn't finished.
DatanodeInfo dn1 = lbs[1].getLocations()[0];
cluster.stopDataNode(dn1.getName());
cluster.setDataNodeDead(dn1);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(1, bm.getPendingReplicationBlocksCount());
} finally {
cluster.shutdown();
}
}
private static int getNumberOfBlocksToBeErasureCoded(MiniDFSCluster cluster)
throws Exception {
DatanodeManager dm =
cluster.getNamesystem().getBlockManager().getDatanodeManager();
int count = 0;
for( DataNode dn : cluster.getDataNodes()){
DatanodeDescriptor dd = dm.getDatanode(dn.getDatanodeId());
count += dd.getNumberOfBlocksToBeErasureCoded();
}
return count;
}
} }