diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 1ded20316d5..673fbab6abb 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -379,3 +379,6 @@ HDFS-8769. Erasure coding: unit test for SequentialBlockGroupIdGenerator. (Rakesh R via waltersu4549) + + HDFS-8202. Improve end to end stirpping file test to add erasure recovering + test. (Xinwei Qin via zhz) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 2866a0ee0c6..ca4b2aabd3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hdfs; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.web.ByteRangeInputStream; import org.junit.Assert; @@ -29,8 +32,11 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; public class StripedFileTestUtil { + public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class); + static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; @@ -193,4 +199,63 @@ public class StripedFileTestUtil { StripedFileTestUtil.getByte(pos + i), buf[i]); } } + + static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out, + final int dnIndex, final AtomicInteger pos) { + final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex); + final DatanodeInfo datanode = getDatanodes(s); + LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos); + cluster.stopDataNode(datanode.getXferAddr()); + } + + static DatanodeInfo getDatanodes(StripedDataStreamer streamer) { + for(;;) { + final DatanodeInfo[] datanodes = streamer.getNodes(); + if (datanodes != null) { + Assert.assertEquals(1, datanodes.length); + Assert.assertNotNull(datanodes[0]); + return datanodes[0]; + } + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + return null; + } + } + } + + /** + * Generate n random and different numbers within + * specified non-negative integer range + * @param min minimum of the range + * @param max maximum of the range + * @param n number to be generated + * @return + */ + public static int[] randomArray(int min, int max, int n){ + if (n > (max - min + 1) || max < min || min < 0 || max < 0) { + return null; + } + int[] result = new int[n]; + for (int i = 0; i < n; i++) { + result[i] = -1; + } + + int count = 0; + while(count < n) { + int num = (int) (Math.random() * (max - min)) + min; + boolean flag = true; + for (int j = 0; j < n; j++) { + if(num == result[j]){ + flag = false; + break; + } + } + if(flag){ + result[count] = num; + count++; + } + } + return result; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index 54fcdf8c099..6594ae1b47e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -181,7 +181,7 @@ public class TestDFSStripedOutputStreamWithFailure { waitTokenExpires(out); } - killDatanode(cluster, stripedOut, dnIndex, pos); + StripedFileTestUtil.killDatanode(cluster, stripedOut, dnIndex, pos); killed = true; } @@ -217,30 +217,6 @@ public class TestDFSStripedOutputStreamWithFailure { } - static DatanodeInfo getDatanodes(StripedDataStreamer streamer) { - for(;;) { - final DatanodeInfo[] datanodes = streamer.getNodes(); - if (datanodes != null) { - Assert.assertEquals(1, datanodes.length); - Assert.assertNotNull(datanodes[0]); - return datanodes[0]; - } - try { - Thread.sleep(100); - } catch (InterruptedException ignored) { - return null; - } - } - } - - static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out, - final int dnIndex, final AtomicInteger pos) { - final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex); - final DatanodeInfo datanode = getDatanodes(s); - LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos); - cluster.stopDataNode(datanode.getXferAddr()); - } - static void checkData(DistributedFileSystem dfs, String src, int length, int killedDnIndex, long oldGS) throws IOException { List> blockGroupList = new ArrayList<>(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java index 8afea198a7e..1719d3f54a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -20,10 +20,11 @@ package org.apache.hadoop.hdfs; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -43,13 +44,21 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import static org.apache.hadoop.hdfs.StripedFileTestUtil.*; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks; public class TestReadStripedFileWithDecoding { static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class); private MiniDFSCluster cluster; private DistributedFileSystem fs; + private final int smallFileLength = blockSize * dataBlocks - 123; + private final int largeFileLength = blockSize * dataBlocks + 123; + private final int[] fileLengths = {smallFileLength, largeFileLength}; + private final int[] dnFailureNums = {1, 2, 3}; @Before public void setup() throws IOException { @@ -67,82 +76,64 @@ public class TestReadStripedFileWithDecoding { } } - @Test - public void testReadWithDNFailure1() throws IOException { - testReadWithDNFailure("/foo", cellSize * (dataBlocks + 2), 0); - } - - @Test - public void testReadWithDNFailure2() throws IOException { - testReadWithDNFailure("/foo", cellSize * (dataBlocks + 2), cellSize * 5); - } - - @Test - public void testReadWithDNFailure3() throws IOException { - testReadWithDNFailure("/foo", cellSize * dataBlocks, 0); - } - /** - * Delete a data block before reading. Verify the decoding works correctly. + * Shutdown tolerable number of Datanode before reading. + * Verify the decoding works correctly. */ - @Test - public void testReadCorruptedData() throws IOException { - // create file - final Path file = new Path("/partially_deleted"); - final int length = cellSize * dataBlocks * 2; - final byte[] bytes = StripedFileTestUtil.generateBytes(length); - DFSTestUtil.writeFile(fs, file, bytes); - - // corrupt the first data block - // find the corresponding data node - int dnIndex = findFirstDataNode(file, cellSize * dataBlocks); - Assert.assertNotEquals(-1, dnIndex); - // find the target block - LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient() - .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0); - final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, - cellSize, dataBlocks, parityBlocks); - // find the target block file - File storageDir = cluster.getInstanceStorageDir(dnIndex, 0); - File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock()); - Assert.assertTrue("Block file does not exist", blkFile.exists()); - // delete the block file - LOG.info("Deliberately removing file " + blkFile.getName()); - Assert.assertTrue("Cannot remove file", blkFile.delete()); - verifyRead(file, length, bytes); - } - - /** - * Corrupt the content of the data block before reading. - */ - @Test - public void testReadCorruptedData2() throws IOException { - // create file - final Path file = new Path("/partially_corrupted"); - final int length = cellSize * dataBlocks * 2; - final byte[] bytes = StripedFileTestUtil.generateBytes(length); - DFSTestUtil.writeFile(fs, file, bytes); - - // corrupt the first data block - // find the first data node - int dnIndex = findFirstDataNode(file, cellSize * dataBlocks); - Assert.assertNotEquals(-1, dnIndex); - // find the first data block - LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient() - .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0); - final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, - cellSize, dataBlocks, parityBlocks); - // find the first block file - File storageDir = cluster.getInstanceStorageDir(dnIndex, 0); - File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock()); - Assert.assertTrue("Block file does not exist", blkFile.exists()); - // corrupt the block file - LOG.info("Deliberately corrupting file " + blkFile.getName()); - try (FileOutputStream out = new FileOutputStream(blkFile)) { - out.write("corruption".getBytes()); + @Test(timeout=300000) + public void testReadWithDNFailure() throws IOException { + for (int fileLength : fileLengths) { + for (int dnFailureNum : dnFailureNums) { + try { + // setup a new cluster with no dead datanode + setup(); + testReadWithDNFailure(fileLength, dnFailureNum); + } catch (IOException ioe) { + String fileType = fileLength < (blockSize * dataBlocks) ? + "smallFile" : "largeFile"; + LOG.error("Failed to read file with DN failure:" + + " fileType = "+ fileType + + ", dnFailureNum = " + dnFailureNum); + } finally { + // tear down the cluster + tearDown(); + } + } } + } - verifyRead(file, length, bytes); + /** + * Corrupt tolerable number of block before reading. + * Verify the decoding works correctly. + */ + @Test(timeout=300000) + public void testReadCorruptedData() throws IOException { + for (int fileLength : fileLengths) { + for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) { + for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) { + String src = "/corrupted_" + dataDelNum + "_" + parityDelNum; + testReadWithBlockCorrupted(src, fileLength, + dataDelNum, parityDelNum, false); + } + } + } + } + + /** + * Delete tolerable number of block before reading. + * Verify the decoding works correctly. + */ + @Test(timeout=300000) + public void testReadCorruptedDataByDeleting() throws IOException { + for (int fileLength : fileLengths) { + for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) { + for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) { + String src = "/deleted_" + dataDelNum + "_" + parityDelNum; + testReadWithBlockCorrupted(src, fileLength, + dataDelNum, parityDelNum, true); + } + } + } } private int findFirstDataNode(Path file, long length) throws IOException { @@ -159,87 +150,45 @@ public class TestReadStripedFileWithDecoding { return -1; } - private void verifyRead(Path file, int length, byte[] expected) + private void verifyRead(Path testPath, int length, byte[] expected) throws IOException { - // pread - try (FSDataInputStream fsdis = fs.open(file)) { - byte[] buf = new byte[length]; - int readLen = fsdis.read(0, buf, 0, buf.length); - Assert.assertEquals("The fileSize of file should be the same to write size", - length, readLen); - Assert.assertArrayEquals(expected, buf); - } - - // stateful read - ByteBuffer result = ByteBuffer.allocate(length); - ByteBuffer buf = ByteBuffer.allocate(1024); - int readLen = 0; - int ret; - try (FSDataInputStream in = fs.open(file)) { - while ((ret = in.read(buf)) >= 0) { - readLen += ret; - buf.flip(); - result.put(buf); - buf.clear(); - } - } - Assert.assertEquals("The length of file should be the same to write size", - length, readLen); - Assert.assertArrayEquals(expected, result.array()); + byte[] buffer = new byte[length + 100]; + StripedFileTestUtil.verifyLength(fs, testPath, length); + StripedFileTestUtil.verifyPread(fs, testPath, length, expected, buffer); + StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, buffer); + StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, + ByteBuffer.allocate(length + 100)); + StripedFileTestUtil.verifySeek(fs, testPath, length); } - private void testReadWithDNFailure(String file, int fileSize, - int startOffsetInFile) throws IOException { - final int failedDNIdx = 2; - Path testPath = new Path(file); - final byte[] bytes = StripedFileTestUtil.generateBytes(fileSize); + private void testReadWithDNFailure(int fileLength, int dnFailureNum) + throws IOException { + String fileType = fileLength < (blockSize * dataBlocks) ? + "smallFile" : "largeFile"; + String src = "/dnFailure_" + dnFailureNum + "_" + fileType; + LOG.info("testReadWithDNFailure: file = " + src + + ", fileSize = " + fileLength + + ", dnFailureNum = " + dnFailureNum); + + Path testPath = new Path(src); + final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength); DFSTestUtil.writeFile(fs, testPath, bytes); // shut down the DN that holds an internal data block BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5, cellSize); - String name = (locs[0].getNames())[failedDNIdx]; - for (DataNode dn : cluster.getDataNodes()) { - int port = dn.getXferPort(); - if (name.contains(Integer.toString(port))) { - dn.shutdown(); - break; + for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) { + String name = (locs[0].getNames())[failedDnIdx]; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + dn.shutdown(); + } } } - // pread - try (FSDataInputStream fsdis = fs.open(testPath)) { - byte[] buf = new byte[fileSize]; - int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length); - Assert.assertEquals("The fileSize of file should be the same to write size", - fileSize - startOffsetInFile, readLen); - - byte[] expected = new byte[readLen]; - System.arraycopy(bytes, startOffsetInFile, expected, 0, - fileSize - startOffsetInFile); - - for (int i = startOffsetInFile; i < fileSize; i++) { - Assert.assertEquals("Byte at " + i + " should be the same", - expected[i - startOffsetInFile], buf[i - startOffsetInFile]); - } - } - - // stateful read - ByteBuffer result = ByteBuffer.allocate(fileSize); - ByteBuffer buf = ByteBuffer.allocate(1024); - int readLen = 0; - int ret; - try (FSDataInputStream in = fs.open(testPath)) { - while ((ret = in.read(buf)) >= 0) { - readLen += ret; - buf.flip(); - result.put(buf); - buf.clear(); - } - } - Assert.assertEquals("The length of file should be the same to write size", - fileSize, readLen); - Assert.assertArrayEquals(bytes, result.array()); + // check file length, pread, stateful read and seek + verifyRead(testPath, fileLength, bytes); } /** @@ -279,21 +228,8 @@ public class TestReadStripedFileWithDecoding { try { // do stateful read - ByteBuffer result = ByteBuffer.allocate(length); - ByteBuffer buf = ByteBuffer.allocate(1024); - int readLen = 0; - int ret; - try (FSDataInputStream in = fs.open(file)) { - while ((ret = in.read(buf)) >= 0) { - readLen += ret; - buf.flip(); - result.put(buf); - buf.clear(); - } - } - Assert.assertEquals("The length of file should be the same to write size", - length, readLen); - Assert.assertArrayEquals(bytes, result.array()); + StripedFileTestUtil.verifyStatefulRead(fs, file, length, bytes, + ByteBuffer.allocate(1024)); // check whether the corruption has been reported to the NameNode final FSNamesystem ns = cluster.getNamesystem(); @@ -341,4 +277,82 @@ public class TestReadStripedFileWithDecoding { DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); } } + + /** + * Test reading a file with some blocks(data blocks or parity blocks or both) + * deleted or corrupted. + * @param src file path + * @param fileLength file length + * @param dataBlkDelNum the deleted or corrupted number of data blocks. + * @param parityBlkDelNum the deleted or corrupted number of parity blocks. + * @param deleteBlockFile whether block file is deleted or corrupted. + * true is to delete the block file. + * false is to corrupt the content of the block file. + * @throws IOException + */ + private void testReadWithBlockCorrupted(String src, int fileLength, + int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile) + throws IOException { + LOG.info("testReadWithBlockCorrupted: file = " + src + + ", dataBlkDelNum = " + dataBlkDelNum + + ", parityBlkDelNum = " + parityBlkDelNum + + ", deleteBlockFile? " + deleteBlockFile); + int recoverBlkNum = dataBlkDelNum + parityBlkDelNum; + Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive", + dataBlkDelNum >= 0 && parityBlkDelNum >= 0); + Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " + + "should be between 1 ~ " + parityBlocks, recoverBlkNum <= parityBlocks); + + // write a file with the length of writeLen + Path srcPath = new Path(src); + final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength); + DFSTestUtil.writeFile(fs, srcPath, bytes); + + // delete or corrupt some blocks + corruptBlocks(srcPath, dataBlkDelNum, parityBlkDelNum, deleteBlockFile); + + // check the file can be read after some blocks were deleted + verifyRead(srcPath, fileLength, bytes); + } + + private void corruptBlocks(Path srcPath, int dataBlkDelNum, + int parityBlkDelNum, boolean deleteBlockFile) throws IOException { + int recoverBlkNum = dataBlkDelNum + parityBlkDelNum; + + LocatedBlocks locatedBlocks = getLocatedBlocks(srcPath); + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + + int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, dataBlocks, + dataBlkDelNum); + Assert.assertNotNull(delDataBlkIndices); + int[] delParityBlkIndices = StripedFileTestUtil.randomArray(dataBlocks, + dataBlocks + parityBlocks, parityBlkDelNum); + Assert.assertNotNull(delParityBlkIndices); + + int[] delBlkIndices = new int[recoverBlkNum]; + System.arraycopy(delDataBlkIndices, 0, + delBlkIndices, 0, delDataBlkIndices.length); + System.arraycopy(delParityBlkIndices, 0, + delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length); + + ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum]; + for (int i = 0; i < recoverBlkNum; i++) { + delBlocks[i] = StripedBlockUtil + .constructInternalBlock(lastBlock.getBlock(), + cellSize, dataBlocks, delBlkIndices[i]); + if (deleteBlockFile) { + // delete the block file + cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]); + } else { + // corrupt the block file + cluster.corruptBlockOnDataNodes(delBlocks[i]); + } + } + } + + private LocatedBlocks getLocatedBlocks(Path filePath) throws IOException { + return fs.getClient().getLocatedBlocks(filePath.toString(), + 0, Long.MAX_VALUE); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java new file mode 100644 index 00000000000..54487736004 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks; + +public class TestWriteStripedFileWithFailure { + public static final Log LOG = LogFactory + .getLog(TestReadStripedFileWithMissingBlocks.class); + private static MiniDFSCluster cluster; + private static FileSystem fs; + private static Configuration conf = new HdfsConfiguration(); + private final int smallFileLength = blockSize * dataBlocks - 123; + private final int largeFileLength = blockSize * dataBlocks + 123; + private final int[] fileLengths = {smallFileLength, largeFileLength}; + + public void setup() throws IOException { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/", + null, cellSize); + fs = cluster.getFileSystem(); + } + + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + // Test writing file with some Datanodes failure + @Test(timeout = 300000) + public void testWriteStripedFileWithDNFailure() throws IOException { + for (int fileLength : fileLengths) { + for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) { + for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) { + try { + // setup a new cluster with no dead datanode + setup(); + writeFileWithDNFailure(fileLength, dataDelNum, parityDelNum); + } catch (IOException ioe) { + String fileType = fileLength < (blockSize * dataBlocks) ? + "smallFile" : "largeFile"; + LOG.error("Failed to write file with DN failure:" + + " fileType = "+ fileType + + ", dataDelNum = " + dataDelNum + + ", parityDelNum = " + parityDelNum); + throw ioe; + } finally { + // tear down the cluster + tearDown(); + } + } + } + } + } + + /** + * Test writing a file with shutting down some DNs(data DNs or parity DNs or both). + * @param fileLength file length + * @param dataDNFailureNum the shutdown number of data DNs + * @param parityDNFailureNum the shutdown number of parity DNs + * @throws IOException + */ + private void writeFileWithDNFailure(int fileLength, + int dataDNFailureNum, int parityDNFailureNum) throws IOException { + String fileType = fileLength < (blockSize * dataBlocks) ? + "smallFile" : "largeFile"; + String src = "/dnFailure_" + dataDNFailureNum + "_" + parityDNFailureNum + + "_" + fileType; + LOG.info("writeFileWithDNFailure: file = " + src + + ", fileType = " + fileType + + ", dataDNFailureNum = " + dataDNFailureNum + + ", parityDNFailureNum = " + parityDNFailureNum); + + Path srcPath = new Path(src); + final AtomicInteger pos = new AtomicInteger(); + final FSDataOutputStream out = fs.create(srcPath); + final DFSStripedOutputStream stripedOut + = (DFSStripedOutputStream)out.getWrappedStream(); + + int[] dataDNFailureIndices = StripedFileTestUtil.randomArray(0, dataBlocks, + dataDNFailureNum); + Assert.assertNotNull(dataDNFailureIndices); + int[] parityDNFailureIndices = StripedFileTestUtil.randomArray(dataBlocks, + dataBlocks + parityBlocks, dataDNFailureNum); + Assert.assertNotNull(parityDNFailureIndices); + + int[] failedDataNodes = new int[dataDNFailureNum + parityDNFailureNum]; + System.arraycopy(dataDNFailureIndices, 0, failedDataNodes, + 0, dataDNFailureIndices.length); + System.arraycopy(parityDNFailureIndices, 0, failedDataNodes, + dataDNFailureIndices.length, parityDNFailureIndices.length); + + final int killPos = fileLength/2; + for (; pos.get() < fileLength; ) { + final int i = pos.getAndIncrement(); + if (i == killPos) { + for(int failedDn : failedDataNodes) { + StripedFileTestUtil.killDatanode(cluster, stripedOut, failedDn, pos); + } + } + write(out, i); + } + out.close(); + + // make sure the expected number of Datanode have been killed + int dnFailureNum = dataDNFailureNum + parityDNFailureNum; + Assert.assertEquals(cluster.getDataNodes().size(), numDNs - dnFailureNum); + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); + StripedFileTestUtil.verifyLength(fs, srcPath, fileLength); + StripedFileTestUtil.verifySeek(fs, srcPath, fileLength); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + smallBuf); + StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + // delete the file + fs.delete(srcPath, true); + } + + void write(FSDataOutputStream out, int i) throws IOException { + try { + out.write(StripedFileTestUtil.getByte(i)); + } catch (IOException e) { + throw new IOException("Failed at i=" + i, e); + } + } +} \ No newline at end of file