HDFS-8334. Erasure coding: rename DFSStripedInputStream related test classes. Contributed by Zhe Zhang.

This commit is contained in:
Zhe Zhang 2015-05-06 15:34:37 -07:00 committed by Zhe Zhang
parent 6616de24cb
commit cea46f79b0
4 changed files with 424 additions and 419 deletions

View File

@ -178,3 +178,8 @@
HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks. HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks.
(Yi Liu via Zhe Zhang) (Yi Liu via Zhe Zhang)
HADOOP-11921. Enhance tests for erasure coders. (Kai Zheng)
HDFS-8334. Erasure coding: rename DFSStripedInputStream related test
classes. (Zhe Zhang)

View File

@ -17,245 +17,202 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import org.apache.hadoop.fs.FileStatus; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.AfterClass; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.junit.Assert; import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager;
import org.junit.BeforeClass; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
public class TestDFSStripedInputStream { public class TestDFSStripedInputStream {
private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class);
private static DistributedFileSystem fs; private MiniDFSCluster cluster;
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private Configuration conf = new Configuration();
private final static int stripesPerBlock = 4; private DistributedFileSystem fs;
static int blockSize = cellSize * stripesPerBlock; private final Path dirPath = new Path("/striped");
static int numDNs = dataBlocks + parityBlocks + 2; private Path filePath = new Path(dirPath, "file");
private ECInfo info = new ECInfo(filePath.toString(),
ECSchemaManager.getSystemDefaultSchema());
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int NUM_STRIPE_PER_BLOCK = 2;
private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
private final int BLOCK_GROUP_SIZE = DATA_BLK_NUM * INTERNAL_BLOCK_SIZE;
private static MiniDFSCluster cluster; @Before
public void setup() throws IOException {
@BeforeClass conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
public static void setup() throws IOException { SimulatedFSDataset.setFactory(conf);
Configuration conf = new Configuration(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); DATA_BLK_NUM + PARITY_BLK_NUM).build();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.waitActive();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
fs.mkdirs(dirPath);
fs.getClient().createErasureCodingZone(dirPath.toString(), null);
} }
@AfterClass @After
public static void tearDown() { public void tearDown() {
if (cluster != null) { if (cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
} }
/**
* Test {@link DFSStripedInputStream#getBlockAt(long)}
*/
@Test @Test
public void testFileEmpty() throws IOException { public void testGetBlock() throws Exception {
testOneFileUsingDFSStripedInputStream("/EmptyFile", 0); final int numBlocks = 4;
} DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
final DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, info);
@Test List<LocatedBlock> lbList = lbs.getLocatedBlocks();
public void testFileSmallerThanOneCell1() throws IOException { for (LocatedBlock aLbList : lbList) {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1); LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
} LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
@Test for (int j = 0; j < DATA_BLK_NUM; j++) {
public void testFileSmallerThanOneCell2() throws IOException { LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1); assertEquals(blks[j].getBlock(), refreshed.getBlock());
} assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
@Test }
public void testFileEqualsWithOneCell() throws IOException {
testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize);
}
@Test
public void testFileSmallerThanOneStripe1() throws IOException {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
cellSize * dataBlocks - 1);
}
@Test
public void testFileSmallerThanOneStripe2() throws IOException {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
cellSize + 123);
}
@Test
public void testFileEqualsWithOneStripe() throws IOException {
testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe",
cellSize * dataBlocks);
}
@Test
public void testFileMoreThanOneStripe1() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1",
cellSize * dataBlocks + 123);
}
@Test
public void testFileMoreThanOneStripe2() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2",
cellSize * dataBlocks + cellSize * dataBlocks + 123);
}
@Test
public void testLessThanFullBlockGroup() throws IOException {
testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup",
cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
}
@Test
public void testFileFullBlockGroup() throws IOException {
testOneFileUsingDFSStripedInputStream("/FullBlockGroup",
blockSize * dataBlocks);
}
@Test
public void testFileMoreThanABlockGroup1() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1",
blockSize * dataBlocks + 123);
}
@Test
public void testFileMoreThanABlockGroup2() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
blockSize * dataBlocks + cellSize+ 123);
}
@Test
public void testFileMoreThanABlockGroup3() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3",
blockSize * dataBlocks * 3 + cellSize * dataBlocks
+ cellSize + 123);
}
private byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];
for (int i = 0; i < cnt; i++) {
bytes[i] = getByte(i);
} }
return bytes;
} }
private byte getByte(long pos) { @Test
final int mod = 29; public void testPread() throws Exception {
return (byte) (pos % mod + 1); final int numBlocks = 2;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCK_GROUP_SIZE);
assert lbs.get(0) instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
for (int i = 0; i < DATA_BLK_NUM; i++) {
Block blk = new Block(bg.getBlock().getBlockId() + i,
NUM_STRIPE_PER_BLOCK * CELLSIZE,
bg.getBlock().getGenerationStamp());
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
cluster.injectBlocks(i, Arrays.asList(blk),
bg.getBlock().getBlockPoolId());
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(),
filePath.toString(), false, info);
int readSize = BLOCK_GROUP_SIZE;
byte[] readBuffer = new byte[readSize];
int ret = in.read(0, readBuffer, 0, readSize);
assertEquals(readSize, ret);
// TODO: verify read results with patterned data from HDFS-8117
} }
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) @Test
throws IOException { public void testStatefulRead() throws Exception {
Path testPath = new Path(src); testStatefulRead(false, false);
final byte[] bytes = generateBytes(writeBytes); testStatefulRead(true, false);
DFSTestUtil.writeFile(fs, testPath, new String(bytes)); testStatefulRead(true, true);
}
//check file length private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
FileStatus status = fs.getFileStatus(testPath); throws Exception {
long fileLength = status.getLen(); final int numBlocks = 2;
Assert.assertEquals("File length should be the same", final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
writeBytes, fileLength); if (cellMisalignPacket) {
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1);
tearDown();
setup();
}
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, fileSize);
// pread assert lbs.getLocatedBlocks().size() == numBlocks;
try (FSDataInputStream fsdis = fs.open(new Path(src))) { for (LocatedBlock lb : lbs.getLocatedBlocks()) {
byte[] buf = new byte[writeBytes + 100]; assert lb instanceof LocatedStripedBlock;
int readLen = fsdis.read(0, buf, 0, buf.length); LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
readLen = readLen >= 0 ? readLen : 0; for (int i = 0; i < DATA_BLK_NUM; i++) {
Assert.assertEquals("The length of file should be the same to write size", Block blk = new Block(bg.getBlock().getBlockId() + i,
writeBytes, readLen); NUM_STRIPE_PER_BLOCK * CELLSIZE,
for (int i = 0; i < writeBytes; i++) { bg.getBlock().getGenerationStamp());
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
buf[i]); cluster.injectBlocks(i, Arrays.asList(blk),
bg.getBlock().getBlockPoolId());
} }
} }
// stateful read with byte array DFSStripedInputStream in =
try (FSDataInputStream fsdis = fs.open(new Path(src))) { new DFSStripedInputStream(fs.getClient(), filePath.toString(),
byte[] buf = new byte[writeBytes + 100]; false, info);
int readLen = 0;
int ret; byte[] expected = new byte[fileSize];
do {
ret = fsdis.read(buf, readLen, buf.length - readLen); for (LocatedBlock bg : lbs.getLocatedBlocks()) {
if (ret > 0) { /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
readLen += ret; for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
for (int j = 0; j < DATA_BLK_NUM; j++) {
for (int k = 0; k < CELLSIZE; k++) {
int posInBlk = i * CELLSIZE + k;
int posInFile = (int) bg.getStartOffset() +
i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
expected[posInFile] = SimulatedFSDataset.simulatedByte(
new Block(bg.getBlock().getBlockId() + j), posInBlk);
}
} }
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf[i]);
} }
} }
// stateful read with ByteBuffer if (useByteBuffer) {
try (FSDataInputStream fsdis = fs.open(new Path(src))) { ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100); int done = 0;
int readLen = 0; while (done < fileSize) {
int ret; int ret = in.read(readBuffer);
do { assertTrue(ret > 0);
ret = fsdis.read(buf); done += ret;
if (ret > 0) {
readLen += ret;
}
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf.array()[i]);
} }
assertArrayEquals(expected, readBuffer.array());
} else {
byte[] readBuffer = new byte[fileSize];
int done = 0;
while (done < fileSize) {
int ret = in.read(readBuffer, done, fileSize - done);
assertTrue(ret > 0);
done += ret;
}
assertArrayEquals(expected, readBuffer);
} }
fs.delete(filePath, true);
// stateful read with 1KB size byte array
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
final byte[] result = new byte[writeBytes];
final byte[] buf = new byte[1024];
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf, 0, buf.length);
if (ret > 0) {
System.arraycopy(buf, 0, result, readLen, ret);
readLen += ret;
}
} while (ret >= 0);
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
Assert.assertArrayEquals(bytes, result);
}
// stateful read using ByteBuffer with 1KB size
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
final ByteBuffer result = ByteBuffer.allocate(writeBytes);
final ByteBuffer buf = ByteBuffer.allocate(1024);
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf);
if (ret > 0) {
readLen += ret;
buf.flip();
result.put(buf);
buf.clear();
}
} while (ret >= 0);
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
Assert.assertArrayEquals(bytes, result.array());
}
} }
} }

View File

@ -1,218 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ECInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
public class TestReadStripedFile {
public static final Log LOG = LogFactory.getLog(TestReadStripedFile.class);
private MiniDFSCluster cluster;
private Configuration conf = new Configuration();
private DistributedFileSystem fs;
private final Path dirPath = new Path("/striped");
private Path filePath = new Path(dirPath, "file");
private ECInfo info = new ECInfo(filePath.toString(),
ECSchemaManager.getSystemDefaultSchema());
private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int NUM_STRIPE_PER_BLOCK = 2;
private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE;
private final int BLOCK_GROUP_SIZE = DATA_BLK_NUM * INTERNAL_BLOCK_SIZE;
@Before
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE);
SimulatedFSDataset.setFactory(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
DATA_BLK_NUM + PARITY_BLK_NUM).build();
cluster.waitActive();
fs = cluster.getFileSystem();
fs.mkdirs(dirPath);
fs.getClient().createErasureCodingZone(dirPath.toString(), null);
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Test {@link DFSStripedInputStream#getBlockAt(long)}
*/
@Test
public void testGetBlock() throws Exception {
final int numBlocks = 4;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks);
final DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, info);
List<LocatedBlock> lbList = lbs.getLocatedBlocks();
for (LocatedBlock aLbList : lbList) {
LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb,
CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM);
for (int j = 0; j < DATA_BLK_NUM; j++) {
LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
assertEquals(blks[j].getBlock(), refreshed.getBlock());
assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
}
}
}
@Test
public void testPread() throws Exception {
final int numBlocks = 2;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, BLOCK_GROUP_SIZE);
assert lbs.get(0) instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
for (int i = 0; i < DATA_BLK_NUM; i++) {
Block blk = new Block(bg.getBlock().getBlockId() + i,
NUM_STRIPE_PER_BLOCK * CELLSIZE,
bg.getBlock().getGenerationStamp());
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
cluster.injectBlocks(i, Arrays.asList(blk),
bg.getBlock().getBlockPoolId());
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(),
filePath.toString(), false, info);
int readSize = BLOCK_GROUP_SIZE;
byte[] readBuffer = new byte[readSize];
int ret = in.read(0, readBuffer, 0, readSize);
assertEquals(readSize, ret);
// TODO: verify read results with patterned data from HDFS-8117
}
@Test
public void testStatefulRead() throws Exception {
testStatefulRead(false, false);
testStatefulRead(true, false);
testStatefulRead(true, true);
}
private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
throws Exception {
final int numBlocks = 2;
final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
if (cellMisalignPacket) {
conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1);
tearDown();
setup();
}
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
filePath.toString(), 0, fileSize);
assert lbs.getLocatedBlocks().size() == numBlocks;
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
assert lb instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
for (int i = 0; i < DATA_BLK_NUM; i++) {
Block blk = new Block(bg.getBlock().getBlockId() + i,
NUM_STRIPE_PER_BLOCK * CELLSIZE,
bg.getBlock().getGenerationStamp());
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
cluster.injectBlocks(i, Arrays.asList(blk),
bg.getBlock().getBlockPoolId());
}
}
DFSStripedInputStream in =
new DFSStripedInputStream(fs.getClient(), filePath.toString(),
false, info);
byte[] expected = new byte[fileSize];
for (LocatedBlock bg : lbs.getLocatedBlocks()) {
/** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
for (int j = 0; j < DATA_BLK_NUM; j++) {
for (int k = 0; k < CELLSIZE; k++) {
int posInBlk = i * CELLSIZE + k;
int posInFile = (int) bg.getStartOffset() +
i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
expected[posInFile] = SimulatedFSDataset.simulatedByte(
new Block(bg.getBlock().getBlockId() + j), posInBlk);
}
}
}
}
if (useByteBuffer) {
ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
int done = 0;
while (done < fileSize) {
int ret = in.read(readBuffer);
assertTrue(ret > 0);
done += ret;
}
assertArrayEquals(expected, readBuffer.array());
} else {
byte[] readBuffer = new byte[fileSize];
int done = 0;
while (done < fileSize) {
int ret = in.read(readBuffer, done, fileSize - done);
assertTrue(ret > 0);
done += ret;
}
assertArrayEquals(expected, readBuffer);
}
fs.delete(filePath, true);
}
}

View File

@ -0,0 +1,261 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
public class TestWriteReadStripedFile {
private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
private static DistributedFileSystem fs;
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final static int stripesPerBlock = 4;
static int blockSize = cellSize * stripesPerBlock;
static int numDNs = dataBlocks + parityBlocks + 2;
private static MiniDFSCluster cluster;
@BeforeClass
public static void setup() throws IOException {
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
fs = cluster.getFileSystem();
}
@AfterClass
public static void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testFileEmpty() throws IOException {
testOneFileUsingDFSStripedInputStream("/EmptyFile", 0);
}
@Test
public void testFileSmallerThanOneCell1() throws IOException {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1);
}
@Test
public void testFileSmallerThanOneCell2() throws IOException {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1);
}
@Test
public void testFileEqualsWithOneCell() throws IOException {
testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize);
}
@Test
public void testFileSmallerThanOneStripe1() throws IOException {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
cellSize * dataBlocks - 1);
}
@Test
public void testFileSmallerThanOneStripe2() throws IOException {
testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
cellSize + 123);
}
@Test
public void testFileEqualsWithOneStripe() throws IOException {
testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe",
cellSize * dataBlocks);
}
@Test
public void testFileMoreThanOneStripe1() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1",
cellSize * dataBlocks + 123);
}
@Test
public void testFileMoreThanOneStripe2() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2",
cellSize * dataBlocks + cellSize * dataBlocks + 123);
}
@Test
public void testLessThanFullBlockGroup() throws IOException {
testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup",
cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
}
@Test
public void testFileFullBlockGroup() throws IOException {
testOneFileUsingDFSStripedInputStream("/FullBlockGroup",
blockSize * dataBlocks);
}
@Test
public void testFileMoreThanABlockGroup1() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1",
blockSize * dataBlocks + 123);
}
@Test
public void testFileMoreThanABlockGroup2() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
blockSize * dataBlocks + cellSize+ 123);
}
@Test
public void testFileMoreThanABlockGroup3() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3",
blockSize * dataBlocks * 3 + cellSize * dataBlocks
+ cellSize + 123);
}
private byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];
for (int i = 0; i < cnt; i++) {
bytes[i] = getByte(i);
}
return bytes;
}
private byte getByte(long pos) {
final int mod = 29;
return (byte) (pos % mod + 1);
}
private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
throws IOException {
Path testPath = new Path(src);
final byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));
//check file length
FileStatus status = fs.getFileStatus(testPath);
long fileLength = status.getLen();
Assert.assertEquals("File length should be the same",
writeBytes, fileLength);
// pread
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
byte[] buf = new byte[writeBytes + 100];
int readLen = fsdis.read(0, buf, 0, buf.length);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf[i]);
}
}
// stateful read with byte array
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
byte[] buf = new byte[writeBytes + 100];
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf, readLen, buf.length - readLen);
if (ret > 0) {
readLen += ret;
}
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf[i]);
}
}
// stateful read with ByteBuffer
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf);
if (ret > 0) {
readLen += ret;
}
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf.array()[i]);
}
}
// stateful read with 1KB size byte array
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
final byte[] result = new byte[writeBytes];
final byte[] buf = new byte[1024];
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf, 0, buf.length);
if (ret > 0) {
System.arraycopy(buf, 0, result, readLen, ret);
readLen += ret;
}
} while (ret >= 0);
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
Assert.assertArrayEquals(bytes, result);
}
// stateful read using ByteBuffer with 1KB size
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
final ByteBuffer result = ByteBuffer.allocate(writeBytes);
final ByteBuffer buf = ByteBuffer.allocate(1024);
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf);
if (ret > 0) {
readLen += ret;
buf.flip();
result.put(buf);
buf.clear();
}
} while (ret >= 0);
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
Assert.assertArrayEquals(bytes, result.array());
}
}
}