HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (waltersu4549)

This commit is contained in:
waltersu4549 2015-05-18 19:10:37 +08:00 committed by Zhe Zhang
parent c9103e9cac
commit a919726914
1 changed files with 151 additions and 122 deletions

View File

@ -21,9 +21,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -33,23 +37,26 @@ import org.junit.Test;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Random;
public class TestWriteReadStripedFile { public class TestWriteReadStripedFile {
private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
private static DistributedFileSystem fs;
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final static int stripesPerBlock = 4; private final static int stripesPerBlock = 4;
static int blockSize = cellSize * stripesPerBlock; static int blockSize = cellSize * stripesPerBlock;
static int numDNs = dataBlocks + parityBlocks + 2; static int numDNs = dataBlocks + parityBlocks + 2;
private static MiniDFSCluster cluster; private static MiniDFSCluster cluster;
private static Configuration conf;
private static FileSystem fs;
private static Random r= new Random();
@BeforeClass @BeforeClass
public static void setup() throws IOException { public static void setup() throws IOException {
Configuration conf = new Configuration(); conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null); cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
@ -134,7 +141,7 @@ public class TestWriteReadStripedFile {
@Test @Test
public void testFileMoreThanABlockGroup2() throws IOException { public void testFileMoreThanABlockGroup2() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
blockSize * dataBlocks + cellSize+ 123); blockSize * dataBlocks + cellSize + 123);
} }
@ -182,136 +189,105 @@ public class TestWriteReadStripedFile {
} }
} }
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) private void testOneFileUsingDFSStripedInputStream(String src, int fileLength)
throws IOException { throws IOException {
Path testPath = new Path(src);
final byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
//check file length final byte[] expected = generateBytes(fileLength);
FileStatus status = fs.getFileStatus(testPath); Path srcPath = new Path(src);
long fileLength = status.getLen(); DFSTestUtil.writeFile(fs, srcPath, new String(expected));
verifyLength(fs, srcPath, fileLength);
byte[] smallBuf = new byte[1024];
byte[] largeBuf = new byte[fileLength + 100];
verifyPread(fs, srcPath, fileLength, expected, largeBuf);
verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
verifySeek(fs, srcPath, fileLength);
verifyStatefulRead(fs, srcPath, fileLength, expected,
ByteBuffer.allocate(fileLength + 100));
verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
verifyStatefulRead(fs, srcPath, fileLength, expected,
ByteBuffer.allocate(1024));
}
@Test
public void testWriteReadUsingWebHdfs() throws Exception {
int fileLength = blockSize * dataBlocks + cellSize + 123;
final byte[] expected = generateBytes(fileLength);
FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe");
DFSTestUtil.writeFile(fs, srcPath, new String(expected));
verifyLength(fs, srcPath, fileLength);
byte[] smallBuf = new byte[1024];
byte[] largeBuf = new byte[fileLength + 100];
verifyPread(fs, srcPath, fileLength, expected, largeBuf);
verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
verifySeek(fs, srcPath, fileLength);
verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
//webhdfs doesn't support bytebuffer read
}
void verifyLength(FileSystem fs, Path srcPath, int fileLength)
throws IOException {
FileStatus status = fs.getFileStatus(srcPath);
Assert.assertEquals("File length should be the same", Assert.assertEquals("File length should be the same",
writeBytes, fileLength); fileLength, status.getLen());
// pread
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
byte[] buf = new byte[writeBytes + 100];
int readLen = fsdis.read(0, buf, 0, buf.length);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf[i]);
}
} }
// stateful read with byte array void verifyPread(FileSystem fs, Path srcPath, int fileLength,
try (FSDataInputStream fsdis = fs.open(new Path(src))) { byte[] expected, byte[] buf) throws IOException {
byte[] buf = new byte[writeBytes + 100]; FSDataInputStream in = fs.open(srcPath);
int readLen = readAll(fsdis, buf); int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
Assert.assertEquals("The length of file should be the same to write size", cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
writeBytes, readLen); cellSize * dataBlocks, fileLength - 102, fileLength - 1};
for (int i = 0; i < writeBytes; i++) { for (int startOffset : startOffsets) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
buf[i]); int remaining = fileLength - startOffset;
in.readFully(startOffset, buf, 0, remaining);
for (int i = 0; i < remaining; i++) {
Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " +
"same",
expected[startOffset + i], buf[i]);
} }
} }
in.close();
// seek and stateful read
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
// seek to 1/2 of content
int pos = writeBytes/2;
assertSeekAndRead(fsdis, pos, writeBytes);
// seek to 1/3 of content
pos = writeBytes/3;
assertSeekAndRead(fsdis, pos, writeBytes);
// seek to 0 pos
pos = 0;
assertSeekAndRead(fsdis, pos, writeBytes);
if (writeBytes > cellSize) {
// seek to cellSize boundary
pos = cellSize -1;
assertSeekAndRead(fsdis, pos, writeBytes);
} }
if (writeBytes > cellSize * dataBlocks) { void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
// seek to striped cell group boundary byte[] expected, byte[] buf) throws IOException {
pos = cellSize * dataBlocks - 1; FSDataInputStream in = fs.open(srcPath);
assertSeekAndRead(fsdis, pos, writeBytes); final byte[] result = new byte[fileLength];
}
if (writeBytes > blockSize * dataBlocks) {
// seek to striped block group boundary
pos = blockSize * dataBlocks - 1;
assertSeekAndRead(fsdis, pos, writeBytes);
}
try {
fsdis.seek(-1);
Assert.fail("Should be failed if seek to negative offset");
} catch (EOFException e) {
// expected
}
try {
fsdis.seek(writeBytes + 1);
Assert.fail("Should be failed if seek after EOF");
} catch (EOFException e) {
// expected
}
}
// stateful read with ByteBuffer
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
int readLen = 0; int readLen = 0;
int ret; int ret;
do { do {
ret = fsdis.read(buf); ret = in.read(buf, 0, buf.length);
if (ret > 0) {
readLen += ret;
}
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf.array()[i]);
}
}
// stateful read with 1KB size byte array
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
final byte[] result = new byte[writeBytes];
final byte[] buf = new byte[1024];
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf, 0, buf.length);
if (ret > 0) { if (ret > 0) {
System.arraycopy(buf, 0, result, readLen, ret); System.arraycopy(buf, 0, result, readLen, ret);
readLen += ret; readLen += ret;
} }
} while (ret >= 0); } while (ret >= 0);
Assert.assertEquals("The length of file should be the same to write size", Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen); fileLength, readLen);
Assert.assertArrayEquals(bytes, result); Assert.assertArrayEquals(expected, result);
in.close();
} }
// stateful read using ByteBuffer with 1KB size
try (FSDataInputStream fsdis = fs.open(new Path(src))) { void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
final ByteBuffer result = ByteBuffer.allocate(writeBytes); byte[] expected, ByteBuffer buf) throws IOException {
final ByteBuffer buf = ByteBuffer.allocate(1024); FSDataInputStream in = fs.open(srcPath);
ByteBuffer result = ByteBuffer.allocate(fileLength);
int readLen = 0; int readLen = 0;
int ret; int ret;
do { do {
ret = fsdis.read(buf); ret = in.read(buf);
if (ret > 0) { if (ret > 0) {
readLen += ret; readLen += ret;
buf.flip(); buf.flip();
@ -319,10 +295,63 @@ public class TestWriteReadStripedFile {
buf.clear(); buf.clear();
} }
} while (ret >= 0); } while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size", Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen); fileLength, readLen);
Assert.assertArrayEquals(bytes, result.array()); Assert.assertArrayEquals(expected, result.array());
in.close();
} }
void verifySeek(FileSystem fs, Path srcPath, int fileLength)
throws IOException {
FSDataInputStream in = fs.open(srcPath);
// seek to 1/2 of content
int pos = fileLength / 2;
assertSeekAndRead(in, pos, fileLength);
// seek to 1/3 of content
pos = fileLength / 3;
assertSeekAndRead(in, pos, fileLength);
// seek to 0 pos
pos = 0;
assertSeekAndRead(in, pos, fileLength);
if (fileLength > cellSize) {
// seek to cellSize boundary
pos = cellSize - 1;
assertSeekAndRead(in, pos, fileLength);
}
if (fileLength > cellSize * dataBlocks) {
// seek to striped cell group boundary
pos = cellSize * dataBlocks - 1;
assertSeekAndRead(in, pos, fileLength);
}
if (fileLength > blockSize * dataBlocks) {
// seek to striped block group boundary
pos = blockSize * dataBlocks - 1;
assertSeekAndRead(in, pos, fileLength);
}
if(!(in.getWrappedStream() instanceof ByteRangeInputStream)){
try {
in.seek(-1);
Assert.fail("Should be failed if seek to negative offset");
} catch (EOFException e) {
// expected
}
try {
in.seek(fileLength + 1);
Assert.fail("Should be failed if seek after EOF");
} catch (EOFException e) {
// expected
}
}
in.close();
} }
@Test @Test