HDFS-9816. Erasure Coding: allow to use multiple EC policies in striping related tests [Part 3]. Contributed by Rui Li.
Change-Id: I64b57bab4722cdc6e1e3148c3a3a401370249afe
This commit is contained in:
parent
647a35e996
commit
def754ec06
|
@ -171,6 +171,30 @@ public class StripedBlockUtil {
|
||||||
+ lastCellSize(lastStripeDataLen, cellSize, numDataBlocks, i);
|
+ lastCellSize(lastStripeDataLen, cellSize, numDataBlocks, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the safe length given the internal block lengths.
|
||||||
|
*
|
||||||
|
* @param ecPolicy The EC policy used for the block group
|
||||||
|
* @param blockLens The lengths of internal blocks
|
||||||
|
* @return The safe length
|
||||||
|
*/
|
||||||
|
public static long getSafeLength(ErasureCodingPolicy ecPolicy,
|
||||||
|
long[] blockLens) {
|
||||||
|
final int cellSize = ecPolicy.getCellSize();
|
||||||
|
final int dataBlkNum = ecPolicy.getNumDataUnits();
|
||||||
|
Preconditions.checkArgument(blockLens.length >= dataBlkNum);
|
||||||
|
final int stripeSize = dataBlkNum * cellSize;
|
||||||
|
long[] cpy = Arrays.copyOf(blockLens, blockLens.length);
|
||||||
|
Arrays.sort(cpy);
|
||||||
|
// full stripe is a stripe has at least dataBlkNum full cells.
|
||||||
|
// lastFullStripeIdx is the index of the last full stripe.
|
||||||
|
int lastFullStripeIdx =
|
||||||
|
(int) (cpy[cpy.length - dataBlkNum] / cellSize);
|
||||||
|
return lastFullStripeIdx * stripeSize; // return the safeLength
|
||||||
|
// TODO: Include lastFullStripeIdx+1 stripe in safeLength, if there exists
|
||||||
|
// such a stripe (and it must be partial).
|
||||||
|
}
|
||||||
|
|
||||||
private static int lastCellSize(int size, int cellSize, int numDataBlocks,
|
private static int lastCellSize(int size, int cellSize, int numDataBlocks,
|
||||||
int i) {
|
int i) {
|
||||||
if (i < numDataBlocks) {
|
if (i < numDataBlocks) {
|
||||||
|
|
|
@ -947,6 +947,9 @@ Trunk (Unreleased)
|
||||||
HDFS-9830. Remove references to hftp in ViewFs documentation in trunk.
|
HDFS-9830. Remove references to hftp in ViewFs documentation in trunk.
|
||||||
(Wei-Chiu Chuang via aajisaka)
|
(Wei-Chiu Chuang via aajisaka)
|
||||||
|
|
||||||
|
HDFS-9816. Erasure Coding: allow to use multiple EC policies in striping
|
||||||
|
related tests [Part 3]. (Rui Li via zhz)
|
||||||
|
|
||||||
Release 2.9.0 - UNRELEASED
|
Release 2.9.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -513,24 +514,15 @@ public class BlockRecoveryWorker {
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
long getSafeLength(Map<Long, BlockRecord> syncBlocks) {
|
long getSafeLength(Map<Long, BlockRecord> syncBlocks) {
|
||||||
final int cellSize = ecPolicy.getCellSize();
|
|
||||||
final int dataBlkNum = ecPolicy.getNumDataUnits();
|
final int dataBlkNum = ecPolicy.getNumDataUnits();
|
||||||
Preconditions.checkArgument(syncBlocks.size() >= dataBlkNum);
|
Preconditions.checkArgument(syncBlocks.size() >= dataBlkNum);
|
||||||
final int stripeSize = dataBlkNum * cellSize;
|
|
||||||
long[] blockLengths = new long[syncBlocks.size()];
|
long[] blockLengths = new long[syncBlocks.size()];
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (BlockRecord r : syncBlocks.values()) {
|
for (BlockRecord r : syncBlocks.values()) {
|
||||||
ReplicaRecoveryInfo rInfo = r.getReplicaRecoveryInfo();
|
ReplicaRecoveryInfo rInfo = r.getReplicaRecoveryInfo();
|
||||||
blockLengths[i++] = rInfo.getNumBytes();
|
blockLengths[i++] = rInfo.getNumBytes();
|
||||||
}
|
}
|
||||||
Arrays.sort(blockLengths);
|
return StripedBlockUtil.getSafeLength(ecPolicy, blockLengths);
|
||||||
// full stripe is a stripe has at least dataBlkNum full cells.
|
|
||||||
// lastFullStripeIdx is the index of the last full stripe.
|
|
||||||
int lastFullStripeIdx =
|
|
||||||
(int) (blockLengths[blockLengths.length - dataBlkNum] / cellSize);
|
|
||||||
return lastFullStripeIdx * stripeSize; // return the safeLength
|
|
||||||
// TODO: Include lastFullStripeIdx+1 stripe in safeLength, if there exists
|
|
||||||
// such a stripe (and it must be partial).
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkLocations(int locationCount)
|
private void checkLocations(int locationCount)
|
||||||
|
|
|
@ -27,7 +27,9 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
@ -45,12 +47,16 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
public class TestLeaseRecoveryStriped {
|
public class TestLeaseRecoveryStriped {
|
||||||
public static final Log LOG = LogFactory
|
public static final Log LOG = LogFactory
|
||||||
.getLog(TestLeaseRecoveryStriped.class);
|
.getLog(TestLeaseRecoveryStriped.class);
|
||||||
|
|
||||||
|
private static final ErasureCodingPolicy ecPolicy =
|
||||||
|
StripedFileTestUtil.TEST_EC_POLICY;
|
||||||
private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS;
|
private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS;
|
||||||
private static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS;
|
private static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS;
|
||||||
private static final int CELL_SIZE = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
|
private static final int CELL_SIZE = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
|
||||||
|
@ -90,7 +96,7 @@ public class TestLeaseRecoveryStriped {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
dfs = cluster.getFileSystem();
|
dfs = cluster.getFileSystem();
|
||||||
dfs.mkdirs(dir);
|
dfs.mkdirs(dir);
|
||||||
dfs.setErasureCodingPolicy(dir, null);
|
dfs.setErasureCodingPolicy(dir, ecPolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -100,26 +106,38 @@ public class TestLeaseRecoveryStriped {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final int[][][] BLOCK_LENGTHS_SUITE = {
|
private static int[][][] getBlockLengthsSuite() {
|
||||||
{ { 11 * CELL_SIZE, 10 * CELL_SIZE, 9 * CELL_SIZE, 8 * CELL_SIZE,
|
final int groups = 4;
|
||||||
7 * CELL_SIZE, 6 * CELL_SIZE, 5 * CELL_SIZE, 4 * CELL_SIZE,
|
final int minNumCell = 3;
|
||||||
3 * CELL_SIZE }, { 36 * CELL_SIZE } },
|
final int maxNumCell = 11;
|
||||||
|
final int minNumDelta = -4;
|
||||||
|
final int maxNumDelta = 2;
|
||||||
|
int delta = 0;
|
||||||
|
int[][][] blkLenSuite = new int[groups][][];
|
||||||
|
Random random = ThreadLocalRandom.current();
|
||||||
|
for (int i = 0; i < blkLenSuite.length; i++) {
|
||||||
|
if (i == blkLenSuite.length - 1) {
|
||||||
|
delta = bytesPerChecksum;
|
||||||
|
}
|
||||||
|
int[][] suite = new int[2][];
|
||||||
|
int[] lens = new int[NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS];
|
||||||
|
long[] lenInLong = new long[lens.length];
|
||||||
|
for (int j = 0; j < lens.length; j++) {
|
||||||
|
int numCell = random.nextInt(maxNumCell - minNumCell + 1) + minNumCell;
|
||||||
|
int numDelta = j < NUM_DATA_BLOCKS ?
|
||||||
|
random.nextInt(maxNumDelta - minNumDelta + 1) + minNumDelta : 0;
|
||||||
|
lens[j] = CELL_SIZE * numCell + delta * numDelta;
|
||||||
|
lenInLong[j] = lens[j];
|
||||||
|
}
|
||||||
|
suite[0] = lens;
|
||||||
|
suite[1] = new int[]{
|
||||||
|
(int) StripedBlockUtil.getSafeLength(ecPolicy, lenInLong)};
|
||||||
|
blkLenSuite[i] = suite;
|
||||||
|
}
|
||||||
|
return blkLenSuite;
|
||||||
|
}
|
||||||
|
|
||||||
{ { 3 * CELL_SIZE, 4 * CELL_SIZE, 5 * CELL_SIZE, 6 * CELL_SIZE,
|
private static final int[][][] BLOCK_LENGTHS_SUITE = getBlockLengthsSuite();
|
||||||
7 * CELL_SIZE, 8 * CELL_SIZE, 9 * CELL_SIZE, 10 * CELL_SIZE,
|
|
||||||
11 * CELL_SIZE }, { 36 * CELL_SIZE } },
|
|
||||||
|
|
||||||
{ { 11 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE, 5 * CELL_SIZE,
|
|
||||||
4 * CELL_SIZE, 2 * CELL_SIZE, 9 * CELL_SIZE, 10 * CELL_SIZE,
|
|
||||||
11 * CELL_SIZE }, { 36 * CELL_SIZE } },
|
|
||||||
|
|
||||||
{ { 8 * CELL_SIZE + bytesPerChecksum,
|
|
||||||
7 * CELL_SIZE + bytesPerChecksum * 2,
|
|
||||||
6 * CELL_SIZE + bytesPerChecksum * 2,
|
|
||||||
5 * CELL_SIZE - bytesPerChecksum * 3,
|
|
||||||
4 * CELL_SIZE - bytesPerChecksum * 4,
|
|
||||||
3 * CELL_SIZE - bytesPerChecksum * 4, 9 * CELL_SIZE, 10 * CELL_SIZE,
|
|
||||||
11 * CELL_SIZE }, { 36 * CELL_SIZE } }, };
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLeaseRecovery() throws Exception {
|
public void testLeaseRecovery() throws Exception {
|
||||||
|
|
|
@ -111,37 +111,36 @@ public class TestUnderReplicatedBlockQueues {
|
||||||
int groupSize = dataBlkNum + parityBlkNum;
|
int groupSize = dataBlkNum + parityBlkNum;
|
||||||
long numBytes = ecPolicy.getCellSize() * dataBlkNum;
|
long numBytes = ecPolicy.getCellSize() * dataBlkNum;
|
||||||
UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
|
UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
|
||||||
|
int numUR = 0;
|
||||||
|
int numCorrupt = 0;
|
||||||
|
|
||||||
// add a striped block which been left NUM_DATA_BLOCKS internal blocks
|
// add under replicated blocks
|
||||||
BlockInfo block1 = genStripedBlockInfo(-100, numBytes);
|
for (int i = 0; dataBlkNum + i < groupSize; i++) {
|
||||||
assertAdded(queues, block1, dataBlkNum, 0, groupSize);
|
BlockInfo block = genStripedBlockInfo(-100 - 100 * i, numBytes);
|
||||||
assertEquals(1, queues.getUnderReplicatedBlockCount());
|
assertAdded(queues, block, dataBlkNum + i, 0, groupSize);
|
||||||
assertEquals(1, queues.size());
|
numUR++;
|
||||||
assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
|
assertEquals(numUR, queues.getUnderReplicatedBlockCount());
|
||||||
|
assertEquals(numUR + numCorrupt, queues.size());
|
||||||
// add a striped block which been left NUM_DATA_BLOCKS+1 internal blocks
|
if (i == 0) {
|
||||||
BlockInfo block2 = genStripedBlockInfo(-200, numBytes);
|
assertInLevel(queues, block,
|
||||||
assertAdded(queues, block2, dataBlkNum + 1, 0, groupSize);
|
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
|
||||||
assertEquals(2, queues.getUnderReplicatedBlockCount());
|
} else if (i * 3 < parityBlkNum + 1) {
|
||||||
assertEquals(2, queues.size());
|
assertInLevel(queues, block,
|
||||||
assertInLevel(queues, block2,
|
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
|
||||||
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
|
} else {
|
||||||
|
assertInLevel(queues, block,
|
||||||
// add a striped block which been left NUM_DATA_BLOCKS+2 internal blocks
|
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
|
||||||
BlockInfo block3 = genStripedBlockInfo(-300, numBytes);
|
}
|
||||||
assertAdded(queues, block3, dataBlkNum + 2, 0, groupSize);
|
}
|
||||||
assertEquals(3, queues.getUnderReplicatedBlockCount());
|
|
||||||
assertEquals(3, queues.size());
|
|
||||||
assertInLevel(queues, block3,
|
|
||||||
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
|
|
||||||
|
|
||||||
// add a corrupted block
|
// add a corrupted block
|
||||||
BlockInfo block_corrupt = genStripedBlockInfo(-400, numBytes);
|
BlockInfo block_corrupt = genStripedBlockInfo(-10, numBytes);
|
||||||
assertEquals(0, queues.getCorruptBlockSize());
|
assertEquals(numCorrupt, queues.getCorruptBlockSize());
|
||||||
assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
|
assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
|
||||||
assertEquals(4, queues.size());
|
numCorrupt++;
|
||||||
assertEquals(3, queues.getUnderReplicatedBlockCount());
|
assertEquals(numUR + numCorrupt, queues.size());
|
||||||
assertEquals(1, queues.getCorruptBlockSize());
|
assertEquals(numUR, queues.getUnderReplicatedBlockCount());
|
||||||
|
assertEquals(numCorrupt, queues.getCorruptBlockSize());
|
||||||
assertInLevel(queues, block_corrupt,
|
assertInLevel(queues, block_corrupt,
|
||||||
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
|
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
|
@ -98,8 +99,6 @@ import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.TestLeaseRecoveryStriped.BLOCK_LENGTHS_SUITE;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This tests if sync all replicas in block recovery works correctly
|
* This tests if sync all replicas in block recovery works correctly
|
||||||
*/
|
*/
|
||||||
|
@ -125,6 +124,30 @@ public class TestBlockRecovery {
|
||||||
private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
|
private final static ExtendedBlock block = new ExtendedBlock(POOL_ID,
|
||||||
BLOCK_ID, BLOCK_LEN, GEN_STAMP);
|
BLOCK_ID, BLOCK_LEN, GEN_STAMP);
|
||||||
|
|
||||||
|
private static final int CELL_SIZE =
|
||||||
|
StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
|
||||||
|
private static final int bytesPerChecksum = 512;
|
||||||
|
private static final int[][][] BLOCK_LENGTHS_SUITE = {
|
||||||
|
{ { 11 * CELL_SIZE, 10 * CELL_SIZE, 9 * CELL_SIZE, 8 * CELL_SIZE,
|
||||||
|
7 * CELL_SIZE, 6 * CELL_SIZE, 5 * CELL_SIZE, 4 * CELL_SIZE,
|
||||||
|
3 * CELL_SIZE }, { 36 * CELL_SIZE } },
|
||||||
|
|
||||||
|
{ { 3 * CELL_SIZE, 4 * CELL_SIZE, 5 * CELL_SIZE, 6 * CELL_SIZE,
|
||||||
|
7 * CELL_SIZE, 8 * CELL_SIZE, 9 * CELL_SIZE, 10 * CELL_SIZE,
|
||||||
|
11 * CELL_SIZE }, { 36 * CELL_SIZE } },
|
||||||
|
|
||||||
|
{ { 11 * CELL_SIZE, 7 * CELL_SIZE, 6 * CELL_SIZE, 5 * CELL_SIZE,
|
||||||
|
4 * CELL_SIZE, 2 * CELL_SIZE, 9 * CELL_SIZE, 10 * CELL_SIZE,
|
||||||
|
11 * CELL_SIZE }, { 36 * CELL_SIZE } },
|
||||||
|
|
||||||
|
{ { 8 * CELL_SIZE + bytesPerChecksum,
|
||||||
|
7 * CELL_SIZE + bytesPerChecksum * 2,
|
||||||
|
6 * CELL_SIZE + bytesPerChecksum * 2,
|
||||||
|
5 * CELL_SIZE - bytesPerChecksum * 3,
|
||||||
|
4 * CELL_SIZE - bytesPerChecksum * 4,
|
||||||
|
3 * CELL_SIZE - bytesPerChecksum * 4, 9 * CELL_SIZE, 10 * CELL_SIZE,
|
||||||
|
11 * CELL_SIZE }, { 36 * CELL_SIZE } }, };
|
||||||
|
|
||||||
static {
|
static {
|
||||||
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.ALL);
|
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.ALL);
|
||||||
GenericTestUtils.setLogLevel(LOG, Level.ALL);
|
GenericTestUtils.setLogLevel(LOG, Level.ALL);
|
||||||
|
|
|
@ -104,11 +104,11 @@ public class TestAddOverReplicatedStripedBlocks {
|
||||||
cluster.triggerBlockReports();
|
cluster.triggerBlockReports();
|
||||||
|
|
||||||
// let a internal block be over replicated with 2 redundant blocks.
|
// let a internal block be over replicated with 2 redundant blocks.
|
||||||
blk.setBlockId(groupId + 2);
|
blk.setBlockId(groupId);
|
||||||
cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid);
|
cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid);
|
||||||
cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid);
|
cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid);
|
||||||
// let a internal block be over replicated with 1 redundant block.
|
// let a internal block be over replicated with 1 redundant block.
|
||||||
blk.setBlockId(groupId + 6);
|
blk.setBlockId(groupId + DATA_BLK_NUM);
|
||||||
cluster.injectBlocks(numDNs - 1, Arrays.asList(blk), bpid);
|
cluster.injectBlocks(numDNs - 1, Arrays.asList(blk), bpid);
|
||||||
|
|
||||||
// update blocksMap
|
// update blocksMap
|
||||||
|
@ -143,10 +143,9 @@ public class TestAddOverReplicatedStripedBlocks {
|
||||||
cluster.triggerBlockReports();
|
cluster.triggerBlockReports();
|
||||||
List<DatanodeInfo> infos = Arrays.asList(bg.getLocations());
|
List<DatanodeInfo> infos = Arrays.asList(bg.getLocations());
|
||||||
|
|
||||||
// let a internal block be over replicated with 2 redundant blocks.
|
// let a internal block be over replicated with (numDNs - GROUP_SIZE + 1)
|
||||||
// Therefor number of internal blocks is over GROUP_SIZE. (5 data blocks +
|
// redundant blocks. Therefor number of internal blocks is over GROUP_SIZE.
|
||||||
// 3 parity blocks + 2 redundant blocks > GROUP_SIZE)
|
blk.setBlockId(groupId);
|
||||||
blk.setBlockId(groupId + 2);
|
|
||||||
List<DataNode> dataNodeList = cluster.getDataNodes();
|
List<DataNode> dataNodeList = cluster.getDataNodes();
|
||||||
for (int i = 0; i < numDNs; i++) {
|
for (int i = 0; i < numDNs; i++) {
|
||||||
if (!infos.contains(dataNodeList.get(i).getDatanodeId())) {
|
if (!infos.contains(dataNodeList.get(i).getDatanodeId())) {
|
||||||
|
|
|
@ -454,7 +454,7 @@ public class TestAddStripedBlocks {
|
||||||
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
|
reports = DFSTestUtil.makeReportForReceivedBlock(reported,
|
||||||
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
|
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
|
||||||
ns.processIncrementalBlockReport(
|
ns.processIncrementalBlockReport(
|
||||||
cluster.getDataNodes().get(5).getDatanodeId(), reports[0]);
|
cluster.getDataNodes().get(0).getDatanodeId(), reports[0]);
|
||||||
BlockManagerTestUtil.updateState(ns.getBlockManager());
|
BlockManagerTestUtil.updateState(ns.getBlockManager());
|
||||||
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
|
Assert.assertEquals(1, ns.getCorruptReplicaBlocks());
|
||||||
Assert.assertEquals(3, bm.getCorruptReplicas(stored).size());
|
Assert.assertEquals(3, bm.getCorruptReplicas(stored).size());
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class TestFSEditLogLoader {
|
||||||
private static final int NUM_DATA_NODES = 0;
|
private static final int NUM_DATA_NODES = 0;
|
||||||
|
|
||||||
private static final ErasureCodingPolicy testECPolicy
|
private static final ErasureCodingPolicy testECPolicy
|
||||||
= ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
= StripedFileTestUtil.TEST_EC_POLICY;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDisplayRecentEditLogOpCodes() throws IOException {
|
public void testDisplayRecentEditLogOpCodes() throws IOException {
|
||||||
|
@ -454,7 +454,8 @@ public class TestFSEditLogLoader {
|
||||||
|
|
||||||
//set the storage policy of the directory
|
//set the storage policy of the directory
|
||||||
fs.mkdir(new Path(testDir), new FsPermission("755"));
|
fs.mkdir(new Path(testDir), new FsPermission("755"));
|
||||||
fs.getClient().getNamenode().setErasureCodingPolicy(testDir, null);
|
fs.getClient().getNamenode().setErasureCodingPolicy(
|
||||||
|
testDir, testECPolicy);
|
||||||
|
|
||||||
// Create a file with striped block
|
// Create a file with striped block
|
||||||
Path p = new Path(testFilePath);
|
Path p = new Path(testFilePath);
|
||||||
|
@ -526,7 +527,8 @@ public class TestFSEditLogLoader {
|
||||||
|
|
||||||
//set the storage policy of the directory
|
//set the storage policy of the directory
|
||||||
fs.mkdir(new Path(testDir), new FsPermission("755"));
|
fs.mkdir(new Path(testDir), new FsPermission("755"));
|
||||||
fs.getClient().getNamenode().setErasureCodingPolicy(testDir, null);
|
fs.getClient().getNamenode().setErasureCodingPolicy(
|
||||||
|
testDir, testECPolicy);
|
||||||
|
|
||||||
//create a file with striped blocks
|
//create a file with striped blocks
|
||||||
Path p = new Path(testFilePath);
|
Path p = new Path(testFilePath);
|
||||||
|
|
|
@ -143,7 +143,7 @@ public class TestFSImage {
|
||||||
private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration conf,
|
private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration conf,
|
||||||
boolean isUC) throws IOException{
|
boolean isUC) throws IOException{
|
||||||
// contruct a INode with StripedBlock for saving and loading
|
// contruct a INode with StripedBlock for saving and loading
|
||||||
fsn.setErasureCodingPolicy("/", null, false);
|
fsn.setErasureCodingPolicy("/", testECPolicy, false);
|
||||||
long id = 123456789;
|
long id = 123456789;
|
||||||
byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes();
|
byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes();
|
||||||
PermissionStatus permissionStatus = new PermissionStatus("testuser_a",
|
PermissionStatus permissionStatus = new PermissionStatus("testuser_a",
|
||||||
|
@ -439,8 +439,8 @@ public class TestFSImage {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSupportBlockGroup() throws IOException {
|
public void testSupportBlockGroup() throws IOException {
|
||||||
final short GROUP_SIZE = (short) (StripedFileTestUtil.NUM_DATA_BLOCKS
|
final short GROUP_SIZE = (short) (testECPolicy.getNumDataUnits() +
|
||||||
+ StripedFileTestUtil.NUM_PARITY_BLOCKS);
|
testECPolicy.getNumParityUnits());
|
||||||
final int BLOCK_SIZE = 8 * 1024 * 1024;
|
final int BLOCK_SIZE = 8 * 1024 * 1024;
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
@ -450,7 +450,7 @@ public class TestFSImage {
|
||||||
.build();
|
.build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
DistributedFileSystem fs = cluster.getFileSystem();
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
fs.getClient().getNamenode().setErasureCodingPolicy("/", null);
|
fs.getClient().getNamenode().setErasureCodingPolicy("/", testECPolicy);
|
||||||
Path file = new Path("/striped");
|
Path file = new Path("/striped");
|
||||||
FSDataOutputStream out = fs.create(file);
|
FSDataOutputStream out = fs.create(file);
|
||||||
byte[] bytes = DFSTestUtil.generateSequentialBytes(0, BLOCK_SIZE);
|
byte[] bytes = DFSTestUtil.generateSequentialBytes(0, BLOCK_SIZE);
|
||||||
|
@ -472,10 +472,14 @@ public class TestFSImage {
|
||||||
BlockInfo[] blks = inode.getBlocks();
|
BlockInfo[] blks = inode.getBlocks();
|
||||||
assertEquals(1, blks.length);
|
assertEquals(1, blks.length);
|
||||||
assertTrue(blks[0].isStriped());
|
assertTrue(blks[0].isStriped());
|
||||||
assertEquals(StripedFileTestUtil.NUM_DATA_BLOCKS, ((BlockInfoStriped)blks[0]).getDataBlockNum());
|
assertEquals(testECPolicy.getNumDataUnits(),
|
||||||
assertEquals(StripedFileTestUtil.NUM_PARITY_BLOCKS, ((BlockInfoStriped)blks[0]).getParityBlockNum());
|
((BlockInfoStriped) blks[0]).getDataBlockNum());
|
||||||
|
assertEquals(testECPolicy.getNumParityUnits(),
|
||||||
|
((BlockInfoStriped) blks[0]).getParityBlockNum());
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -654,8 +655,11 @@ public class TestFsck {
|
||||||
setNumFiles(4).build();
|
setNumFiles(4).build();
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
|
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(10)
|
ErasureCodingPolicy ecPolicy =
|
||||||
.build();
|
ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||||
|
int numAllUnits = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
|
||||||
|
numAllUnits + 1).build();
|
||||||
FileSystem fs = null;
|
FileSystem fs = null;
|
||||||
try {
|
try {
|
||||||
String topDir = "/myDir";
|
String topDir = "/myDir";
|
||||||
|
@ -666,7 +670,8 @@ public class TestFsck {
|
||||||
fs = cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
util.createFiles(fs, topDir);
|
util.createFiles(fs, topDir);
|
||||||
// set topDir to EC when it has replicated files
|
// set topDir to EC when it has replicated files
|
||||||
cluster.getFileSystem().getClient().setErasureCodingPolicy(topDir, null);
|
cluster.getFileSystem().getClient().setErasureCodingPolicy(
|
||||||
|
topDir, ecPolicy);
|
||||||
|
|
||||||
// create a new file under topDir
|
// create a new file under topDir
|
||||||
DFSTestUtil.createFile(fs, new Path(topDir, "ecFile"), 1024, (short) 1, 0L);
|
DFSTestUtil.createFile(fs, new Path(topDir, "ecFile"), 1024, (short) 1, 0L);
|
||||||
|
@ -687,16 +692,16 @@ public class TestFsck {
|
||||||
"-blocks", "-openforwrite");
|
"-blocks", "-openforwrite");
|
||||||
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
|
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
|
||||||
assertTrue(outStr.contains("OPENFORWRITE"));
|
assertTrue(outStr.contains("OPENFORWRITE"));
|
||||||
assertTrue(outStr.contains("Live_repl=9"));
|
assertTrue(outStr.contains("Live_repl=" + numAllUnits));
|
||||||
assertTrue(outStr.contains("Expected_repl=9"));
|
assertTrue(outStr.contains("Expected_repl=" + numAllUnits));
|
||||||
|
|
||||||
// Use -openforwrite option to list open files
|
// Use -openforwrite option to list open files
|
||||||
outStr = runFsck(conf, 0, true, openFile.toString(), "-files", "-blocks",
|
outStr = runFsck(conf, 0, true, openFile.toString(), "-files", "-blocks",
|
||||||
"-locations", "-openforwrite", "-replicaDetails");
|
"-locations", "-openforwrite", "-replicaDetails");
|
||||||
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
|
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
|
||||||
assertTrue(outStr.contains("OPENFORWRITE"));
|
assertTrue(outStr.contains("OPENFORWRITE"));
|
||||||
assertTrue(outStr.contains("Live_repl=9"));
|
assertTrue(outStr.contains("Live_repl=" + numAllUnits));
|
||||||
assertTrue(outStr.contains("Expected_repl=9"));
|
assertTrue(outStr.contains("Expected_repl=" + numAllUnits));
|
||||||
assertTrue(outStr.contains("Under Construction Block:"));
|
assertTrue(outStr.contains("Under Construction Block:"));
|
||||||
|
|
||||||
// Close the file
|
// Close the file
|
||||||
|
@ -708,8 +713,8 @@ public class TestFsck {
|
||||||
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
|
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
|
||||||
assertFalse(outStr.contains("OPENFORWRITE"));
|
assertFalse(outStr.contains("OPENFORWRITE"));
|
||||||
assertFalse(outStr.contains("Under Construction Block:"));
|
assertFalse(outStr.contains("Under Construction Block:"));
|
||||||
assertFalse(outStr.contains("Expected_repl=9"));
|
assertFalse(outStr.contains("Expected_repl=" + numAllUnits));
|
||||||
assertTrue(outStr.contains("Live_repl=9"));
|
assertTrue(outStr.contains("Live_repl=" + numAllUnits));
|
||||||
util.cleanup(fs, topDir);
|
util.cleanup(fs, topDir);
|
||||||
} finally {
|
} finally {
|
||||||
if (fs != null) {
|
if (fs != null) {
|
||||||
|
|
|
@ -77,13 +77,10 @@ public class TestReconstructStripedBlocks {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMissingStripedBlockWithBusyNode1() throws Exception {
|
public void testMissingStripedBlockWithBusyNode() throws Exception {
|
||||||
doTestMissingStripedBlock(2, 1);
|
for (int i = 1; i <= NUM_PARITY_BLOCKS; i++) {
|
||||||
}
|
doTestMissingStripedBlock(i, 1);
|
||||||
|
}
|
||||||
@Test
|
|
||||||
public void testMissingStripedBlockWithBusyNode2() throws Exception {
|
|
||||||
doTestMissingStripedBlock(3, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -153,7 +153,7 @@
|
||||||
<comparators>
|
<comparators>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>ErasureCodingPolicy=[Name=RS-6-3-64k</expected-output>
|
<expected-output>ErasureCodingPolicy=[Name=</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -292,7 +292,7 @@
|
||||||
<comparators>
|
<comparators>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>Policy 'invalidpolicy' does not match any of the supported policies. Please select any one of [RS-6-3-64k]</expected-output>
|
<expected-output>Policy 'invalidpolicy' does not match any of the supported policies. Please select any one of [</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
|
Loading…
Reference in New Issue