HDFS-9658. Erasure Coding: allow to use multiple EC policies in striping related tests. Contributed by Rui Li.

Change-Id: I5b02f5bde4d343b7529c6a7fe5df73bd99c6cb24
This commit is contained in:
zhezhang 2016-02-03 14:40:44 -08:00
parent 48b76c8301
commit a0f5e83f28
5 changed files with 117 additions and 56 deletions

View File

@ -918,6 +918,9 @@ Trunk (Unreleased)
HDFS-9403. Erasure coding: some EC tests are missing timeout. HDFS-9403. Erasure coding: some EC tests are missing timeout.
(Gao Rui via zhz) (Gao Rui via zhz)
HDFS-9658. Erasure Coding: allow to use multiple EC policies in striping
related tests. (Rui Li via zhz)
Release 2.9.0 - UNRELEASED Release 2.9.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -26,10 +26,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.CodecUtil;
@ -56,9 +58,14 @@ public class StripedFileTestUtil {
* These values correspond to the values used by the system default erasure * These values correspond to the values used by the system default erasure
* coding policy. * coding policy.
*/ */
public static final short NUM_DATA_BLOCKS = (short) 6; public static final ErasureCodingPolicy TEST_EC_POLICY =
public static final short NUM_PARITY_BLOCKS = (short) 3; ErasureCodingPolicyManager.getSystemDefaultPolicy();
public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024; public static final short NUM_DATA_BLOCKS =
(short) TEST_EC_POLICY.getNumDataUnits();
public static final short NUM_PARITY_BLOCKS =
(short) TEST_EC_POLICY.getNumParityUnits();
public static final int BLOCK_STRIPED_CELL_SIZE =
TEST_EC_POLICY.getCellSize();
static int stripesPerBlock = 4; static int stripesPerBlock = 4;
public static int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock; public static int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;

View File

@ -60,7 +60,8 @@ public class TestDFSStripedInputStream {
private DistributedFileSystem fs; private DistributedFileSystem fs;
private final Path dirPath = new Path("/striped"); private final Path dirPath = new Path("/striped");
private Path filePath = new Path(dirPath, "file"); private Path filePath = new Path(dirPath, "file");
private final ErasureCodingPolicy ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy(); private final ErasureCodingPolicy ecPolicy =
ErasureCodingPolicyManager.getSystemDefaultPolicy();
private final short DATA_BLK_NUM = StripedFileTestUtil.NUM_DATA_BLOCKS; private final short DATA_BLK_NUM = StripedFileTestUtil.NUM_DATA_BLOCKS;
private final short PARITY_BLK_NUM = StripedFileTestUtil.NUM_PARITY_BLOCKS; private final short PARITY_BLK_NUM = StripedFileTestUtil.NUM_PARITY_BLOCKS;
private final int CELLSIZE = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; private final int CELLSIZE = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
@ -182,7 +183,7 @@ public class TestDFSStripedInputStream {
@Test @Test
public void testPreadWithDNFailure() throws Exception { public void testPreadWithDNFailure() throws Exception {
final int numBlocks = 4; final int numBlocks = 4;
final int failedDNIdx = 2; final int failedDNIdx = DATA_BLK_NUM - 1;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false); NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
@ -200,11 +201,10 @@ public class TestDFSStripedInputStream {
} }
DFSStripedInputStream in = DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
ErasureCodingPolicyManager.getSystemDefaultPolicy(), null); ecPolicy, null);
int readSize = BLOCK_GROUP_SIZE; int readSize = BLOCK_GROUP_SIZE;
byte[] readBuffer = new byte[readSize]; byte[] readBuffer = new byte[readSize];
byte[] expected = new byte[readSize]; byte[] expected = new byte[readSize];
cluster.stopDataNode(failedDNIdx);
/** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
for (int j = 0; j < DATA_BLK_NUM; j++) { for (int j = 0; j < DATA_BLK_NUM; j++) {
@ -221,20 +221,30 @@ public class TestDFSStripedInputStream {
DATA_BLK_NUM, PARITY_BLK_NUM); DATA_BLK_NUM, PARITY_BLK_NUM);
// Update the expected content for decoded data // Update the expected content for decoded data
int[] missingBlkIdx = new int[PARITY_BLK_NUM];
for (int i = 0; i < missingBlkIdx.length; i++) {
if (i == 0) {
missingBlkIdx[i] = failedDNIdx;
} else {
missingBlkIdx[i] = DATA_BLK_NUM + i;
}
}
cluster.stopDataNode(failedDNIdx);
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE]; byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE];
int[] missingBlkIdx = new int[]{failedDNIdx, 7, 8}; byte[][] decodeOutputs = new byte[missingBlkIdx.length][CELLSIZE];
byte[][] decodeOutputs = new byte[PARITY_BLK_NUM][CELLSIZE];
for (int j = 0; j < DATA_BLK_NUM; j++) { for (int j = 0; j < DATA_BLK_NUM; j++) {
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE; int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
if (j != failedDNIdx) { if (j != failedDNIdx) {
System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE); System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE);
} }
} }
for (int k = 0; k < CELLSIZE; k++) { for (int j = DATA_BLK_NUM; j < DATA_BLK_NUM + PARITY_BLK_NUM; j++) {
int posInBlk = i * CELLSIZE + k; for (int k = 0; k < CELLSIZE; k++) {
decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte( int posInBlk = i * CELLSIZE + k;
new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk); decodeInputs[j][k] = SimulatedFSDataset.simulatedByte(
new Block(bg.getBlock().getBlockId() + j), posInBlk);
}
} }
for (int m : missingBlkIdx) { for (int m : missingBlkIdx) {
decodeInputs[m] = null; decodeInputs[m] = null;

View File

@ -17,16 +17,7 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals; import com.google.common.base.Preconditions;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
@ -52,7 +43,18 @@ import org.apache.log4j.Level;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Preconditions; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestDFSStripedOutputStreamWithFailure { public class TestDFSStripedOutputStreamWithFailure {
@ -73,8 +75,8 @@ public class TestDFSStripedOutputStreamWithFailure {
private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK; private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS; private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
private static final int FLUSH_POS private static final int FLUSH_POS =
= 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1; 9 * DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
static { static {
System.out.println("NUM_DATA_BLOCKS = " + NUM_DATA_BLOCKS); System.out.println("NUM_DATA_BLOCKS = " + NUM_DATA_BLOCKS);
@ -103,23 +105,60 @@ public class TestDFSStripedOutputStreamWithFailure {
return lengths; return lengths;
} }
private static final int[][] dnIndexSuite = { private static final int[][] dnIndexSuite = getDnIndexSuite();
{0, 1},
{0, 5}, private static int[][] getDnIndexSuite() {
{0, 6}, final int maxNumLevel = 2;
{0, 8}, final int maxPerLevel = 8;
{1, 5}, List<List<Integer>> allLists = new ArrayList<>();
{1, 6}, int numIndex = NUM_PARITY_BLOCKS;
{6, 8}, for (int i = 0; i < maxNumLevel && numIndex > 1; i++) {
{0, 1, 2}, List<List<Integer>> lists =
{3, 4, 5}, combinations(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS, numIndex);
{0, 1, 6}, if (lists.size() > maxPerLevel) {
{0, 5, 6}, Collections.shuffle(lists);
{0, 5, 8}, lists = lists.subList(0, maxPerLevel);
{0, 6, 7}, }
{5, 6, 7}, allLists.addAll(lists);
{6, 7, 8}, numIndex--;
}; }
int[][] dnIndexSuite = new int[allLists.size()][];
for (int i = 0; i < dnIndexSuite.length; i++) {
int[] list = new int[allLists.get(i).size()];
for (int j = 0; j < list.length; j++) {
list[j] = allLists.get(i).get(j);
}
dnIndexSuite[i] = list;
}
return dnIndexSuite;
}
// get all combinations of k integers from {0,...,n-1}
private static List<List<Integer>> combinations(int n, int k) {
List<List<Integer>> res = new LinkedList<List<Integer>>();
if (k >= 1 && n >= k) {
getComb(n, k, new Stack<Integer>(), res);
}
return res;
}
private static void getComb(int n, int k, Stack<Integer> stack,
List<List<Integer>> res) {
if (stack.size() == k) {
List<Integer> list = new ArrayList<Integer>(stack);
res.add(list);
} else {
int next = stack.empty() ? 0 : stack.peek() + 1;
while (next < n) {
stack.push(next);
getComb(n, k, stack, res);
next++;
}
}
if (!stack.empty()) {
stack.pop();
}
}
private int[] getKillPositions(int fileLen, int num) { private int[] getKillPositions(int fileLen, int num) {
int[] positions = new int[num]; int[] positions = new int[num];
@ -193,10 +232,10 @@ public class TestDFSStripedOutputStreamWithFailure {
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
// Set short retry timeouts so this test runs faster // Set short retry timeouts so this test runs faster
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
for (int dn = 0; dn < 9; dn += 2) { for (int dn = 0; dn < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; dn += 2) {
try { try {
setup(conf); setup(conf);
runTest(length, new int[]{length/2}, new int[]{dn}, true); runTest(length, new int[]{length / 2}, new int[]{dn}, true);
} catch (Exception e) { } catch (Exception e) {
LOG.error("failed, dn=" + dn + ", length=" + length); LOG.error("failed, dn=" + dn + ", length=" + length);
throw e; throw e;
@ -216,10 +255,10 @@ public class TestDFSStripedOutputStreamWithFailure {
ArrayList<DataNode> dataNodes = cluster.getDataNodes(); ArrayList<DataNode> dataNodes = cluster.getDataNodes();
// shutdown few datanodes to avoid getting sufficient data blocks number // shutdown few datanodes to avoid getting sufficient data blocks number
// of datanodes // of datanodes
int killDns = dataNodes.size() / 2; int numDatanodes = dataNodes.size();
int numDatanodes = dataNodes.size() - killDns; while (numDatanodes >= NUM_DATA_BLOCKS) {
for (int i = 0; i < killDns; i++) { cluster.stopDataNode(0);
cluster.stopDataNode(i); numDatanodes--;
} }
cluster.restartNameNodes(); cluster.restartNameNodes();
cluster.triggerHeartbeats(); cluster.triggerHeartbeats();
@ -235,8 +274,10 @@ public class TestDFSStripedOutputStreamWithFailure {
Assert.fail("Failed to validate available dns against blkGroupSize"); Assert.fail("Failed to validate available dns against blkGroupSize");
} catch (IOException ioe) { } catch (IOException ioe) {
// expected // expected
GenericTestUtils.assertExceptionContains("Failed to get 6 nodes from" + GenericTestUtils.assertExceptionContains("Failed to get " +
" namenode: blockGroupSize= 9, blocks.length= 5", ioe); NUM_DATA_BLOCKS + " nodes from namenode: blockGroupSize= " +
(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length= " +
numDatanodes, ioe);
} }
} finally { } finally {
tearDown(); tearDown();
@ -274,7 +315,7 @@ public class TestDFSStripedOutputStreamWithFailure {
void runTest(final int length) { void runTest(final int length) {
final HdfsConfiguration conf = newHdfsConfiguration(); final HdfsConfiguration conf = newHdfsConfiguration();
for (int dn = 0; dn < 9; dn++) { for (int dn = 0; dn < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; dn++) {
try { try {
LOG.info("runTest: dn=" + dn + ", length=" + length); LOG.info("runTest: dn=" + dn + ", length=" + length);
setup(conf); setup(conf);

View File

@ -39,11 +39,11 @@ public class TestErasureCodingPolicyWithSnapshot {
private DistributedFileSystem fs; private DistributedFileSystem fs;
private Configuration conf; private Configuration conf;
private final static short GROUP_SIZE = StripedFileTestUtil.NUM_DATA_BLOCKS private final static short GROUP_SIZE = (short) (StripedFileTestUtil.
+ StripedFileTestUtil.NUM_PARITY_BLOCKS; NUM_DATA_BLOCKS + StripedFileTestUtil.NUM_PARITY_BLOCKS);
private final static int SUCCESS = 0; private final static int SUCCESS = 0;
private final ErasureCodingPolicy sysDefaultPolicy = ErasureCodingPolicyManager private final ErasureCodingPolicy sysDefaultPolicy =
.getSystemDefaultPolicy(); StripedFileTestUtil.TEST_EC_POLICY;
@Before @Before
public void setupCluster() throws IOException { public void setupCluster() throws IOException {