HDFS-8978. Erasure coding: fix 2 failed tests of DFSStripedOutputStream. Contributed by Walter Su.

This commit is contained in:
Jing Zhao 2015-09-02 17:41:08 -07:00
parent ddf4e78547
commit 60bd765ac1
10 changed files with 130 additions and 63 deletions

View File

@ -409,3 +409,6 @@
HDFS-8937. Erasure coding: do not throw exception when setting replication on
EC file. (Gao Rui via jing9)
HDFS-8978. Erasure coding: fix 2 failed tests of DFSStripedOutputStream.
(Walter Su via jing9)

View File

@ -67,7 +67,7 @@ public class StripedDataStreamer extends DataStreamer {
this.queue = queue;
}
T poll(final int i) throws InterruptedIOException {
T poll(final int i) throws IOException {
for(;;) {
synchronized(queue) {
final T polled = queue.poll(i);
@ -80,6 +80,7 @@ public class StripedDataStreamer extends DataStreamer {
return queue.poll(i);
} catch(IOException ioe) {
LOG.warn("Failed to populate, " + this, ioe);
throw ioe;
}
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
@ -25,6 +26,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
import org.junit.Assert;
@ -32,8 +35,11 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
public class StripedFileTestUtil {
public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class);
@ -224,6 +230,57 @@ public class StripedFileTestUtil {
}
}
/**
* If the length of blockGroup is less than a full stripe, it returns the the
* number of actual data internal blocks. Otherwise returns NUM_DATA_BLOCKS.
*/
public static short getRealDataBlockNum(int numBytes) {
return (short) Math.min(dataBlocks,
(numBytes - 1) / BLOCK_STRIPED_CELL_SIZE + 1);
}
public static short getRealTotalBlockNum(int numBytes) {
return (short) (getRealDataBlockNum(numBytes) + parityBlocks);
}
/**
* Wait for all the internalBlocks of the blockGroups of the given file to be reported.
*/
public static void waitBlockGroupsReported(DistributedFileSystem fs, String src)
throws IOException, InterruptedException, TimeoutException {
boolean success;
final int ATTEMPTS = 40;
int count = 0;
do {
success = true;
count++;
LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0);
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
short expected = getRealTotalBlockNum((int) lb.getBlockSize());
int reported = lb.getLocations().length;
if (reported != expected){
success = false;
System.out.println("blockGroup " + lb.getBlock() + " of file " + src
+ " has reported internalBlocks " + reported
+ " (desired " + expected + "); locations "
+ Joiner.on(' ').join(lb.getLocations()));
Thread.sleep(1000);
break;
}
}
if (success) {
System.out.println("All blockGroups of file " + src
+ " verified to have all internalBlocks.");
}
} while (!success && count < ATTEMPTS);
if (count == ATTEMPTS) {
throw new TimeoutException("Timed out waiting for " + src +
" to have all the internalBlocks");
}
}
/**
* Generate n random and different numbers within
* specified non-negative integer range

View File

@ -70,6 +70,7 @@ public class TestDFSStripedInputStream {
@Before
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
SimulatedFSDataset.setFactory(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
DATA_BLK_NUM + PARITY_BLK_NUM).build();

View File

@ -66,6 +66,7 @@ public class TestDFSStripedOutputStream {
int numDNs = dataBlocks + parityBlocks + 2;
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
fs = cluster.getFileSystem();
@ -79,76 +80,76 @@ public class TestDFSStripedOutputStream {
}
@Test
public void testFileEmpty() throws IOException {
public void testFileEmpty() throws Exception {
testOneFile("/EmptyFile", 0);
}
@Test
public void testFileSmallerThanOneCell1() throws IOException {
public void testFileSmallerThanOneCell1() throws Exception {
testOneFile("/SmallerThanOneCell", 1);
}
@Test
public void testFileSmallerThanOneCell2() throws IOException {
public void testFileSmallerThanOneCell2() throws Exception {
testOneFile("/SmallerThanOneCell", cellSize - 1);
}
@Test
public void testFileEqualsWithOneCell() throws IOException {
public void testFileEqualsWithOneCell() throws Exception {
testOneFile("/EqualsWithOneCell", cellSize);
}
@Test
public void testFileSmallerThanOneStripe1() throws IOException {
public void testFileSmallerThanOneStripe1() throws Exception {
testOneFile("/SmallerThanOneStripe", cellSize * dataBlocks - 1);
}
@Test
public void testFileSmallerThanOneStripe2() throws IOException {
public void testFileSmallerThanOneStripe2() throws Exception {
testOneFile("/SmallerThanOneStripe", cellSize + 123);
}
@Test
public void testFileEqualsWithOneStripe() throws IOException {
public void testFileEqualsWithOneStripe() throws Exception {
testOneFile("/EqualsWithOneStripe", cellSize * dataBlocks);
}
@Test
public void testFileMoreThanOneStripe1() throws IOException {
public void testFileMoreThanOneStripe1() throws Exception {
testOneFile("/MoreThanOneStripe1", cellSize * dataBlocks + 123);
}
@Test
public void testFileMoreThanOneStripe2() throws IOException {
public void testFileMoreThanOneStripe2() throws Exception {
testOneFile("/MoreThanOneStripe2", cellSize * dataBlocks
+ cellSize * dataBlocks + 123);
}
@Test
public void testFileLessThanFullBlockGroup() throws IOException {
public void testFileLessThanFullBlockGroup() throws Exception {
testOneFile("/LessThanFullBlockGroup",
cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
}
@Test
public void testFileFullBlockGroup() throws IOException {
public void testFileFullBlockGroup() throws Exception {
testOneFile("/FullBlockGroup", blockSize * dataBlocks);
}
@Test
public void testFileMoreThanABlockGroup1() throws IOException {
public void testFileMoreThanABlockGroup1() throws Exception {
testOneFile("/MoreThanABlockGroup1", blockSize * dataBlocks + 123);
}
@Test
public void testFileMoreThanABlockGroup2() throws IOException {
public void testFileMoreThanABlockGroup2() throws Exception {
testOneFile("/MoreThanABlockGroup2",
blockSize * dataBlocks + cellSize+ 123);
}
@Test
public void testFileMoreThanABlockGroup3() throws IOException {
public void testFileMoreThanABlockGroup3() throws Exception {
testOneFile("/MoreThanABlockGroup3",
blockSize * dataBlocks * 3 + cellSize * dataBlocks
+ cellSize + 123);
@ -167,12 +168,13 @@ public class TestDFSStripedOutputStream {
return (byte) (pos % mod + 1);
}
private void testOneFile(String src, int writeBytes) throws IOException {
private void testOneFile(String src, int writeBytes) throws Exception {
src += "_" + writeBytes;
Path testPath = new Path(src);
byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
StripedFileTestUtil.waitBlockGroupsReported(fs, src);
// check file length
FileStatus status = fs.getFileStatus(testPath);

View File

@ -56,6 +56,7 @@ import org.junit.Test;
import com.google.common.base.Preconditions;
public class TestDFSStripedOutputStreamWithFailure {
public static final Log LOG = LogFactory.getLog(
TestDFSStripedOutputStreamWithFailure.class);
@ -135,6 +136,7 @@ public class TestDFSStripedOutputStreamWithFailure {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
return conf;
}
@ -331,6 +333,13 @@ public class TestDFSStripedOutputStreamWithFailure {
}
}
out.close();
short expectedReported = StripedFileTestUtil.getRealTotalBlockNum(length);
if (length > dnIndex * CELL_SIZE || dnIndex >= NUM_DATA_BLOCKS) {
expectedReported--;
}
DFSTestUtil.waitReplication(dfs, p, expectedReported);
Assert.assertTrue(killed);
// check file length

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
@ -62,6 +63,9 @@ public class TestReadStripedFileWithDecoding {
@Before
public void setup() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
.numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
@ -80,7 +84,7 @@ public class TestReadStripedFileWithDecoding {
* Verify the decoding works correctly.
*/
@Test(timeout=300000)
public void testReadWithDNFailure() throws IOException {
public void testReadWithDNFailure() throws Exception {
for (int fileLength : fileLengths) {
for (int dnFailureNum : dnFailureNums) {
try {
@ -161,7 +165,7 @@ public class TestReadStripedFileWithDecoding {
}
private void testReadWithDNFailure(int fileLength, int dnFailureNum)
throws IOException {
throws Exception {
String fileType = fileLength < (blockSize * dataBlocks) ?
"smallFile" : "largeFile";
String src = "/dnFailure_" + dnFailureNum + "_" + fileType;
@ -172,6 +176,7 @@ public class TestReadStripedFileWithDecoding {
Path testPath = new Path(src);
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(fs, testPath, bytes);
StripedFileTestUtil.waitBlockGroupsReported(fs, src);
// shut down the DN that holds an internal data block
BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,

View File

@ -44,13 +44,14 @@ public class TestReadStripedFileWithMissingBlocks {
public static final Log LOG = LogFactory
.getLog(TestReadStripedFileWithMissingBlocks.class);
private static MiniDFSCluster cluster;
private static FileSystem fs;
private static DistributedFileSystem fs;
private static Configuration conf = new HdfsConfiguration();
private final int fileLength = blockSize * dataBlocks + 123;
@Before
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
fs = cluster.getFileSystem();
@ -64,42 +65,43 @@ public class TestReadStripedFileWithMissingBlocks {
}
@Test
public void testReadFileWithMissingBlocks1() throws IOException {
public void testReadFileWithMissingBlocks1() throws Exception {
readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 0);
}
@Test
public void testReadFileWithMissingBlocks2() throws IOException {
public void testReadFileWithMissingBlocks2() throws Exception {
readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 1);
}
@Test
public void testReadFileWithMissingBlocks3() throws IOException {
public void testReadFileWithMissingBlocks3() throws Exception {
readFileWithMissingBlocks(new Path("/foo"), fileLength, 1, 2);
}
@Test
public void testReadFileWithMissingBlocks4() throws IOException {
public void testReadFileWithMissingBlocks4() throws Exception {
readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 0);
}
@Test
public void testReadFileWithMissingBlocks5() throws IOException {
public void testReadFileWithMissingBlocks5() throws Exception {
readFileWithMissingBlocks(new Path("/foo"), fileLength, 2, 1);
}
@Test
public void testReadFileWithMissingBlocks6() throws IOException {
public void testReadFileWithMissingBlocks6() throws Exception {
readFileWithMissingBlocks(new Path("/foo"), fileLength, 3, 0);
}
private void readFileWithMissingBlocks(Path srcPath, int fileLength,
int missingDataNum, int missingParityNum)
throws IOException {
throws Exception {
LOG.info("readFileWithMissingBlocks: (" + missingDataNum + ","
+ missingParityNum + ")");
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(fs, srcPath, new String(expected));
StripedFileTestUtil.waitBlockGroupsReported(fs, srcPath.toUri().getPath());
StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
int dataBlocks = (fileLength - 1) / cellSize + 1;
BlockLocation[] locs = fs.getFileBlockLocations(srcPath, 0, cellSize);

View File

@ -183,7 +183,10 @@ public class TestRecoverStripedFile {
Path file = new Path(fileName);
testCreateStripedFile(file, fileLen);
final byte[] data = new byte[fileLen];
ThreadLocalRandom.current().nextBytes(data);
DFSTestUtil.writeFile(fs, file, data);
StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
LocatedBlocks locatedBlocks = getLocatedBlocks(file);
assertEquals(locatedBlocks.getFileLength(), fileLen);
@ -380,21 +383,4 @@ public class TestRecoverStripedFile {
private LocatedBlocks getLocatedBlocks(Path file) throws IOException {
return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE);
}
private void testCreateStripedFile(Path file, int dataLen)
throws IOException {
final byte[] data = new byte[dataLen];
ThreadLocalRandom.current().nextBytes(data);
writeContents(file, data);
}
void writeContents(Path file, byte[] contents)
throws IOException {
FSDataOutputStream out = fs.create(file);
try {
out.write(contents, 0, contents.length);
} finally {
out.close();
}
}
}

View File

@ -45,7 +45,7 @@ import static org.apache.hadoop.hdfs.StripedFileTestUtil.stripesPerBlock;
public class TestWriteReadStripedFile {
public static final Log LOG = LogFactory.getLog(TestWriteReadStripedFile.class);
private static MiniDFSCluster cluster;
private static FileSystem fs;
private static DistributedFileSystem fs;
private static Configuration conf = new HdfsConfiguration();
static {
@ -69,32 +69,32 @@ public class TestWriteReadStripedFile {
}
@Test
public void testFileEmpty() throws IOException {
public void testFileEmpty() throws Exception {
testOneFileUsingDFSStripedInputStream("/EmptyFile", 0);
testOneFileUsingDFSStripedInputStream("/EmptyFile2", 0, true);
}
@Test
public void testFileSmallerThanOneCell1() throws IOException {
public void testFileSmallerThanOneCell1() throws Exception {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1);
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", 1, true);
}
@Test
public void testFileSmallerThanOneCell2() throws IOException {
public void testFileSmallerThanOneCell2() throws Exception {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1);
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", cellSize - 1,
true);
}
@Test
public void testFileEqualsWithOneCell() throws IOException {
public void testFileEqualsWithOneCell() throws Exception {
testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize);
testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell2", cellSize, true);
}
@Test
public void testFileSmallerThanOneStripe1() throws IOException {
public void testFileSmallerThanOneStripe1() throws Exception {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
cellSize * dataBlocks - 1);
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2",
@ -102,7 +102,7 @@ public class TestWriteReadStripedFile {
}
@Test
public void testFileSmallerThanOneStripe2() throws IOException {
public void testFileSmallerThanOneStripe2() throws Exception {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
cellSize + 123);
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2",
@ -110,7 +110,7 @@ public class TestWriteReadStripedFile {
}
@Test
public void testFileEqualsWithOneStripe() throws IOException {
public void testFileEqualsWithOneStripe() throws Exception {
testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe",
cellSize * dataBlocks);
testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe2",
@ -118,7 +118,7 @@ public class TestWriteReadStripedFile {
}
@Test
public void testFileMoreThanOneStripe1() throws IOException {
public void testFileMoreThanOneStripe1() throws Exception {
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1",
cellSize * dataBlocks + 123);
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe12",
@ -126,7 +126,7 @@ public class TestWriteReadStripedFile {
}
@Test
public void testFileMoreThanOneStripe2() throws IOException {
public void testFileMoreThanOneStripe2() throws Exception {
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2",
cellSize * dataBlocks + cellSize * dataBlocks + 123);
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe22",
@ -134,7 +134,7 @@ public class TestWriteReadStripedFile {
}
@Test
public void testLessThanFullBlockGroup() throws IOException {
public void testLessThanFullBlockGroup() throws Exception {
testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup",
cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup2",
@ -142,7 +142,7 @@ public class TestWriteReadStripedFile {
}
@Test
public void testFileFullBlockGroup() throws IOException {
public void testFileFullBlockGroup() throws Exception {
testOneFileUsingDFSStripedInputStream("/FullBlockGroup",
blockSize * dataBlocks);
testOneFileUsingDFSStripedInputStream("/FullBlockGroup2",
@ -150,7 +150,7 @@ public class TestWriteReadStripedFile {
}
@Test
public void testFileMoreThanABlockGroup1() throws IOException {
public void testFileMoreThanABlockGroup1() throws Exception {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1",
blockSize * dataBlocks + 123);
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup12",
@ -158,7 +158,7 @@ public class TestWriteReadStripedFile {
}
@Test
public void testFileMoreThanABlockGroup2() throws IOException {
public void testFileMoreThanABlockGroup2() throws Exception {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
blockSize * dataBlocks + cellSize + 123);
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup22",
@ -167,7 +167,7 @@ public class TestWriteReadStripedFile {
@Test
public void testFileMoreThanABlockGroup3() throws IOException {
public void testFileMoreThanABlockGroup3() throws Exception {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3",
blockSize * dataBlocks * 3 + cellSize * dataBlocks
+ cellSize + 123);
@ -177,15 +177,16 @@ public class TestWriteReadStripedFile {
}
private void testOneFileUsingDFSStripedInputStream(String src, int fileLength)
throws IOException {
throws Exception {
testOneFileUsingDFSStripedInputStream(src, fileLength, false);
}
private void testOneFileUsingDFSStripedInputStream(String src, int fileLength,
boolean withDataNodeFailure) throws IOException {
boolean withDataNodeFailure) throws Exception {
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
Path srcPath = new Path(src);
DFSTestUtil.writeFile(fs, srcPath, new String(expected));
StripedFileTestUtil.waitBlockGroupsReported(fs, src);
StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);