From cea46f79b0fabdf42bd3ac6e25fa263b75791f57 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Wed, 6 May 2015 15:34:37 -0700 Subject: [PATCH] HDFS-8334. Erasure coding: rename DFSStripedInputStream related test classes. Contributed by Zhe Zhang. --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 5 + .../hdfs/TestDFSStripedInputStream.java | 359 ++++++++---------- .../hadoop/hdfs/TestReadStripedFile.java | 218 ----------- .../hadoop/hdfs/TestWriteReadStripedFile.java | 261 +++++++++++++ 4 files changed, 424 insertions(+), 419 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 0d2d4485e16..8729f8ae48e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -178,3 +178,8 @@ HDFS-7348. Erasure Coding: DataNode reconstruct striped blocks. (Yi Liu via Zhe Zhang) + + HADOOP-11921. Enhance tests for erasure coders. (Kai Zheng) + + HDFS-8334. Erasure coding: rename DFSStripedInputStream related test + classes. (Zhe Zhang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 11cdf7b4985..a1f704d5b1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -17,245 +17,202 @@ */ package org.apache.hadoop.hdfs; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ECInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; public class TestDFSStripedInputStream { - private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; - private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class); - private static DistributedFileSystem fs; - private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; - private final static int stripesPerBlock = 4; - static int blockSize = cellSize * stripesPerBlock; - static int numDNs = dataBlocks + parityBlocks + 2; + private MiniDFSCluster cluster; + private Configuration conf = new Configuration(); + private DistributedFileSystem fs; + private final Path dirPath = new Path("/striped"); + private Path filePath = new Path(dirPath, "file"); + private ECInfo info = new ECInfo(filePath.toString(), + ECSchemaManager.getSystemDefaultSchema()); + private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; + private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; + private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final int NUM_STRIPE_PER_BLOCK = 2; + private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE; + private final int BLOCK_GROUP_SIZE = DATA_BLK_NUM * INTERNAL_BLOCK_SIZE; - private static MiniDFSCluster cluster; - - @BeforeClass - public static void setup() throws IOException { - Configuration conf = new Configuration(); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); - cluster.getFileSystem().getClient().createErasureCodingZone("/", null); + @Before + public void setup() throws IOException { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE); + SimulatedFSDataset.setFactory(conf); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + DATA_BLK_NUM + PARITY_BLK_NUM).build(); + cluster.waitActive(); fs = cluster.getFileSystem(); + fs.mkdirs(dirPath); + fs.getClient().createErasureCodingZone(dirPath.toString(), null); } - @AfterClass - public static void tearDown() { + @After + public void tearDown() { if (cluster != null) { cluster.shutdown(); } } + /** + * Test {@link DFSStripedInputStream#getBlockAt(long)} + */ @Test - public void testFileEmpty() throws IOException { - testOneFileUsingDFSStripedInputStream("/EmptyFile", 0); - } + public void testGetBlock() throws Exception { + final int numBlocks = 4; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks); + final DFSStripedInputStream in = + new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, info); - @Test - public void testFileSmallerThanOneCell1() throws IOException { - testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1); - } - - @Test - public void testFileSmallerThanOneCell2() throws IOException { - testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1); - } - - @Test - public void testFileEqualsWithOneCell() throws IOException { - testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize); - } - - @Test - public void testFileSmallerThanOneStripe1() throws IOException { - testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", - cellSize * dataBlocks - 1); - } - - @Test - public void testFileSmallerThanOneStripe2() throws IOException { - testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", - cellSize + 123); - } - - @Test - public void testFileEqualsWithOneStripe() throws IOException { - testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe", - cellSize * dataBlocks); - } - - @Test - public void testFileMoreThanOneStripe1() throws IOException { - testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1", - cellSize * dataBlocks + 123); - } - - @Test - public void testFileMoreThanOneStripe2() throws IOException { - testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2", - cellSize * dataBlocks + cellSize * dataBlocks + 123); - } - - @Test - public void testLessThanFullBlockGroup() throws IOException { - testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup", - cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize); - } - - @Test - public void testFileFullBlockGroup() throws IOException { - testOneFileUsingDFSStripedInputStream("/FullBlockGroup", - blockSize * dataBlocks); - } - - @Test - public void testFileMoreThanABlockGroup1() throws IOException { - testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1", - blockSize * dataBlocks + 123); - } - - @Test - public void testFileMoreThanABlockGroup2() throws IOException { - testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", - blockSize * dataBlocks + cellSize+ 123); - } - - - @Test - public void testFileMoreThanABlockGroup3() throws IOException { - testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3", - blockSize * dataBlocks * 3 + cellSize * dataBlocks - + cellSize + 123); - } - - private byte[] generateBytes(int cnt) { - byte[] bytes = new byte[cnt]; - for (int i = 0; i < cnt; i++) { - bytes[i] = getByte(i); + List lbList = lbs.getLocatedBlocks(); + for (LocatedBlock aLbList : lbList) { + LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList; + LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb, + CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM); + for (int j = 0; j < DATA_BLK_NUM; j++) { + LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset()); + assertEquals(blks[j].getBlock(), refreshed.getBlock()); + assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset()); + assertArrayEquals(blks[j].getLocations(), refreshed.getLocations()); + } } - return bytes; } - private byte getByte(long pos) { - final int mod = 29; - return (byte) (pos % mod + 1); + @Test + public void testPread() throws Exception { + final int numBlocks = 2; + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, BLOCK_GROUP_SIZE); + + assert lbs.get(0) instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0)); + for (int i = 0; i < DATA_BLK_NUM; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + NUM_STRIPE_PER_BLOCK * CELLSIZE, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); + } + DFSStripedInputStream in = + new DFSStripedInputStream(fs.getClient(), + filePath.toString(), false, info); + int readSize = BLOCK_GROUP_SIZE; + byte[] readBuffer = new byte[readSize]; + int ret = in.read(0, readBuffer, 0, readSize); + + assertEquals(readSize, ret); + // TODO: verify read results with patterned data from HDFS-8117 } - private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) - throws IOException { - Path testPath = new Path(src); - final byte[] bytes = generateBytes(writeBytes); - DFSTestUtil.writeFile(fs, testPath, new String(bytes)); + @Test + public void testStatefulRead() throws Exception { + testStatefulRead(false, false); + testStatefulRead(true, false); + testStatefulRead(true, true); + } - //check file length - FileStatus status = fs.getFileStatus(testPath); - long fileLength = status.getLen(); - Assert.assertEquals("File length should be the same", - writeBytes, fileLength); + private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket) + throws Exception { + final int numBlocks = 2; + final int fileSize = numBlocks * BLOCK_GROUP_SIZE; + if (cellMisalignPacket) { + conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1); + tearDown(); + setup(); + } + DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, + NUM_STRIPE_PER_BLOCK, false); + LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( + filePath.toString(), 0, fileSize); - // pread - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - byte[] buf = new byte[writeBytes + 100]; - int readLen = fsdis.read(0, buf, 0, buf.length); - readLen = readLen >= 0 ? readLen : 0; - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), - buf[i]); + assert lbs.getLocatedBlocks().size() == numBlocks; + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + assert lb instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock)(lb); + for (int i = 0; i < DATA_BLK_NUM; i++) { + Block blk = new Block(bg.getBlock().getBlockId() + i, + NUM_STRIPE_PER_BLOCK * CELLSIZE, + bg.getBlock().getGenerationStamp()); + blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); + cluster.injectBlocks(i, Arrays.asList(blk), + bg.getBlock().getBlockPoolId()); } } - // stateful read with byte array - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - byte[] buf = new byte[writeBytes + 100]; - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf, readLen, buf.length - readLen); - if (ret > 0) { - readLen += ret; + DFSStripedInputStream in = + new DFSStripedInputStream(fs.getClient(), filePath.toString(), + false, info); + + byte[] expected = new byte[fileSize]; + + for (LocatedBlock bg : lbs.getLocatedBlocks()) { + /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ + for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { + for (int j = 0; j < DATA_BLK_NUM; j++) { + for (int k = 0; k < CELLSIZE; k++) { + int posInBlk = i * CELLSIZE + k; + int posInFile = (int) bg.getStartOffset() + + i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k; + expected[posInFile] = SimulatedFSDataset.simulatedByte( + new Block(bg.getBlock().getBlockId() + j), posInBlk); + } } - } while (ret >= 0); - readLen = readLen >= 0 ? readLen : 0; - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), - buf[i]); } } - // stateful read with ByteBuffer - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100); - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf); - if (ret > 0) { - readLen += ret; - } - } while (ret >= 0); - readLen = readLen >= 0 ? readLen : 0; - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - for (int i = 0; i < writeBytes; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), - buf.array()[i]); + if (useByteBuffer) { + ByteBuffer readBuffer = ByteBuffer.allocate(fileSize); + int done = 0; + while (done < fileSize) { + int ret = in.read(readBuffer); + assertTrue(ret > 0); + done += ret; } + assertArrayEquals(expected, readBuffer.array()); + } else { + byte[] readBuffer = new byte[fileSize]; + int done = 0; + while (done < fileSize) { + int ret = in.read(readBuffer, done, fileSize - done); + assertTrue(ret > 0); + done += ret; + } + assertArrayEquals(expected, readBuffer); } - - // stateful read with 1KB size byte array - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - final byte[] result = new byte[writeBytes]; - final byte[] buf = new byte[1024]; - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf, 0, buf.length); - if (ret > 0) { - System.arraycopy(buf, 0, result, readLen, ret); - readLen += ret; - } - } while (ret >= 0); - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - Assert.assertArrayEquals(bytes, result); - } - - // stateful read using ByteBuffer with 1KB size - try (FSDataInputStream fsdis = fs.open(new Path(src))) { - final ByteBuffer result = ByteBuffer.allocate(writeBytes); - final ByteBuffer buf = ByteBuffer.allocate(1024); - int readLen = 0; - int ret; - do { - ret = fsdis.read(buf); - if (ret > 0) { - readLen += ret; - buf.flip(); - result.put(buf); - buf.clear(); - } - } while (ret >= 0); - Assert.assertEquals("The length of file should be the same to write size", - writeBytes, readLen); - Assert.assertArrayEquals(bytes, result.array()); - } + fs.delete(filePath, true); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java deleted file mode 100644 index 1ad480e7f29..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.ECInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; -import org.apache.hadoop.hdfs.server.namenode.ECSchemaManager; -import org.apache.hadoop.hdfs.util.StripedBlockUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; - -public class TestReadStripedFile { - - public static final Log LOG = LogFactory.getLog(TestReadStripedFile.class); - - private MiniDFSCluster cluster; - private Configuration conf = new Configuration(); - private DistributedFileSystem fs; - private final Path dirPath = new Path("/striped"); - private Path filePath = new Path(dirPath, "file"); - private ECInfo info = new ECInfo(filePath.toString(), - ECSchemaManager.getSystemDefaultSchema()); - private final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; - private final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; - private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; - private final int NUM_STRIPE_PER_BLOCK = 2; - private final int INTERNAL_BLOCK_SIZE = NUM_STRIPE_PER_BLOCK * CELLSIZE; - private final int BLOCK_GROUP_SIZE = DATA_BLK_NUM * INTERNAL_BLOCK_SIZE; - - @Before - public void setup() throws IOException { - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, INTERNAL_BLOCK_SIZE); - SimulatedFSDataset.setFactory(conf); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes( - DATA_BLK_NUM + PARITY_BLK_NUM).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - fs.mkdirs(dirPath); - fs.getClient().createErasureCodingZone(dirPath.toString(), null); - } - - @After - public void tearDown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - /** - * Test {@link DFSStripedInputStream#getBlockAt(long)} - */ - @Test - public void testGetBlock() throws Exception { - final int numBlocks = 4; - DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - NUM_STRIPE_PER_BLOCK, false); - LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( - filePath.toString(), 0, BLOCK_GROUP_SIZE * numBlocks); - final DFSStripedInputStream in = - new DFSStripedInputStream(fs.getClient(), filePath.toString(), false, info); - - List lbList = lbs.getLocatedBlocks(); - for (LocatedBlock aLbList : lbList) { - LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList; - LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(lsb, - CELLSIZE, DATA_BLK_NUM, PARITY_BLK_NUM); - for (int j = 0; j < DATA_BLK_NUM; j++) { - LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset()); - assertEquals(blks[j].getBlock(), refreshed.getBlock()); - assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset()); - assertArrayEquals(blks[j].getLocations(), refreshed.getLocations()); - } - } - } - - @Test - public void testPread() throws Exception { - final int numBlocks = 2; - DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - NUM_STRIPE_PER_BLOCK, false); - LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( - filePath.toString(), 0, BLOCK_GROUP_SIZE); - - assert lbs.get(0) instanceof LocatedStripedBlock; - LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0)); - for (int i = 0; i < DATA_BLK_NUM; i++) { - Block blk = new Block(bg.getBlock().getBlockId() + i, - NUM_STRIPE_PER_BLOCK * CELLSIZE, - bg.getBlock().getGenerationStamp()); - blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); - cluster.injectBlocks(i, Arrays.asList(blk), - bg.getBlock().getBlockPoolId()); - } - DFSStripedInputStream in = - new DFSStripedInputStream(fs.getClient(), - filePath.toString(), false, info); - int readSize = BLOCK_GROUP_SIZE; - byte[] readBuffer = new byte[readSize]; - int ret = in.read(0, readBuffer, 0, readSize); - - assertEquals(readSize, ret); - // TODO: verify read results with patterned data from HDFS-8117 - } - - @Test - public void testStatefulRead() throws Exception { - testStatefulRead(false, false); - testStatefulRead(true, false); - testStatefulRead(true, true); - } - - private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket) - throws Exception { - final int numBlocks = 2; - final int fileSize = numBlocks * BLOCK_GROUP_SIZE; - if (cellMisalignPacket) { - conf.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT + 1); - tearDown(); - setup(); - } - DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - NUM_STRIPE_PER_BLOCK, false); - LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( - filePath.toString(), 0, fileSize); - - assert lbs.getLocatedBlocks().size() == numBlocks; - for (LocatedBlock lb : lbs.getLocatedBlocks()) { - assert lb instanceof LocatedStripedBlock; - LocatedStripedBlock bg = (LocatedStripedBlock)(lb); - for (int i = 0; i < DATA_BLK_NUM; i++) { - Block blk = new Block(bg.getBlock().getBlockId() + i, - NUM_STRIPE_PER_BLOCK * CELLSIZE, - bg.getBlock().getGenerationStamp()); - blk.setGenerationStamp(bg.getBlock().getGenerationStamp()); - cluster.injectBlocks(i, Arrays.asList(blk), - bg.getBlock().getBlockPoolId()); - } - } - - DFSStripedInputStream in = - new DFSStripedInputStream(fs.getClient(), filePath.toString(), - false, info); - - byte[] expected = new byte[fileSize]; - - for (LocatedBlock bg : lbs.getLocatedBlocks()) { - /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ - for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) { - for (int j = 0; j < DATA_BLK_NUM; j++) { - for (int k = 0; k < CELLSIZE; k++) { - int posInBlk = i * CELLSIZE + k; - int posInFile = (int) bg.getStartOffset() + - i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k; - expected[posInFile] = SimulatedFSDataset.simulatedByte( - new Block(bg.getBlock().getBlockId() + j), posInBlk); - } - } - } - } - - if (useByteBuffer) { - ByteBuffer readBuffer = ByteBuffer.allocate(fileSize); - int done = 0; - while (done < fileSize) { - int ret = in.read(readBuffer); - assertTrue(ret > 0); - done += ret; - } - assertArrayEquals(expected, readBuffer.array()); - } else { - byte[] readBuffer = new byte[fileSize]; - int done = 0; - while (done < fileSize) { - int ret = in.read(readBuffer, done, fileSize - done); - assertTrue(ret > 0); - done += ret; - } - assertArrayEquals(expected, readBuffer); - } - fs.delete(filePath, true); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java new file mode 100644 index 00000000000..eacc6edddf6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class TestWriteReadStripedFile { + private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + + + private static DistributedFileSystem fs; + private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final static int stripesPerBlock = 4; + static int blockSize = cellSize * stripesPerBlock; + static int numDNs = dataBlocks + parityBlocks + 2; + + private static MiniDFSCluster cluster; + + @BeforeClass + public static void setup() throws IOException { + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/", null); + fs = cluster.getFileSystem(); + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testFileEmpty() throws IOException { + testOneFileUsingDFSStripedInputStream("/EmptyFile", 0); + } + + @Test + public void testFileSmallerThanOneCell1() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1); + } + + @Test + public void testFileSmallerThanOneCell2() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1); + } + + @Test + public void testFileEqualsWithOneCell() throws IOException { + testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize); + } + + @Test + public void testFileSmallerThanOneStripe1() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", + cellSize * dataBlocks - 1); + } + + @Test + public void testFileSmallerThanOneStripe2() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", + cellSize + 123); + } + + @Test + public void testFileEqualsWithOneStripe() throws IOException { + testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe", + cellSize * dataBlocks); + } + + @Test + public void testFileMoreThanOneStripe1() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1", + cellSize * dataBlocks + 123); + } + + @Test + public void testFileMoreThanOneStripe2() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2", + cellSize * dataBlocks + cellSize * dataBlocks + 123); + } + + @Test + public void testLessThanFullBlockGroup() throws IOException { + testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup", + cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize); + } + + @Test + public void testFileFullBlockGroup() throws IOException { + testOneFileUsingDFSStripedInputStream("/FullBlockGroup", + blockSize * dataBlocks); + } + + @Test + public void testFileMoreThanABlockGroup1() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1", + blockSize * dataBlocks + 123); + } + + @Test + public void testFileMoreThanABlockGroup2() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", + blockSize * dataBlocks + cellSize+ 123); + } + + + @Test + public void testFileMoreThanABlockGroup3() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3", + blockSize * dataBlocks * 3 + cellSize * dataBlocks + + cellSize + 123); + } + + private byte[] generateBytes(int cnt) { + byte[] bytes = new byte[cnt]; + for (int i = 0; i < cnt; i++) { + bytes[i] = getByte(i); + } + return bytes; + } + + private byte getByte(long pos) { + final int mod = 29; + return (byte) (pos % mod + 1); + } + + private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) + throws IOException { + Path testPath = new Path(src); + final byte[] bytes = generateBytes(writeBytes); + DFSTestUtil.writeFile(fs, testPath, new String(bytes)); + + //check file length + FileStatus status = fs.getFileStatus(testPath); + long fileLength = status.getLen(); + Assert.assertEquals("File length should be the same", + writeBytes, fileLength); + + // pread + try (FSDataInputStream fsdis = fs.open(new Path(src))) { + byte[] buf = new byte[writeBytes + 100]; + int readLen = fsdis.read(0, buf, 0, buf.length); + readLen = readLen >= 0 ? readLen : 0; + Assert.assertEquals("The length of file should be the same to write size", + writeBytes, readLen); + for (int i = 0; i < writeBytes; i++) { + Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), + buf[i]); + } + } + + // stateful read with byte array + try (FSDataInputStream fsdis = fs.open(new Path(src))) { + byte[] buf = new byte[writeBytes + 100]; + int readLen = 0; + int ret; + do { + ret = fsdis.read(buf, readLen, buf.length - readLen); + if (ret > 0) { + readLen += ret; + } + } while (ret >= 0); + readLen = readLen >= 0 ? readLen : 0; + Assert.assertEquals("The length of file should be the same to write size", + writeBytes, readLen); + for (int i = 0; i < writeBytes; i++) { + Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), + buf[i]); + } + } + + // stateful read with ByteBuffer + try (FSDataInputStream fsdis = fs.open(new Path(src))) { + ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100); + int readLen = 0; + int ret; + do { + ret = fsdis.read(buf); + if (ret > 0) { + readLen += ret; + } + } while (ret >= 0); + readLen = readLen >= 0 ? readLen : 0; + Assert.assertEquals("The length of file should be the same to write size", + writeBytes, readLen); + for (int i = 0; i < writeBytes; i++) { + Assert.assertEquals("Byte at " + i + " should be the same", getByte(i), + buf.array()[i]); + } + } + + // stateful read with 1KB size byte array + try (FSDataInputStream fsdis = fs.open(new Path(src))) { + final byte[] result = new byte[writeBytes]; + final byte[] buf = new byte[1024]; + int readLen = 0; + int ret; + do { + ret = fsdis.read(buf, 0, buf.length); + if (ret > 0) { + System.arraycopy(buf, 0, result, readLen, ret); + readLen += ret; + } + } while (ret >= 0); + Assert.assertEquals("The length of file should be the same to write size", + writeBytes, readLen); + Assert.assertArrayEquals(bytes, result); + } + + // stateful read using ByteBuffer with 1KB size + try (FSDataInputStream fsdis = fs.open(new Path(src))) { + final ByteBuffer result = ByteBuffer.allocate(writeBytes); + final ByteBuffer buf = ByteBuffer.allocate(1024); + int readLen = 0; + int ret; + do { + ret = fsdis.read(buf); + if (ret > 0) { + readLen += ret; + buf.flip(); + result.put(buf); + buf.clear(); + } + } while (ret >= 0); + Assert.assertEquals("The length of file should be the same to write size", + writeBytes, readLen); + Assert.assertArrayEquals(bytes, result.array()); + } + } +}