From d4035d42f02507bc89adce3f0450c36b58b201c1 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Tue, 5 Sep 2017 16:33:29 -0700 Subject: [PATCH] HDFS-12377. Refactor TestReadStripedFileWithDecoding to avoid test timeouts. --- .../ReadStripedFileWithDecodingHelper.java | 273 ++++++++++++++++ .../hadoop/hdfs/StripedFileTestUtil.java | 9 +- .../TestReadStripedFileWithDNFailure.java | 107 ++++++ .../hdfs/TestReadStripedFileWithDecoding.java | 309 ++---------------- ...eadStripedFileWithDecodingCorruptData.java | 87 +++++ ...eadStripedFileWithDecodingDeletedData.java | 88 +++++ 6 files changed, 596 insertions(+), 277 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDNFailure.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingCorruptData.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingDeletedData.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java new file mode 100644 index 00000000000..4202969ee43 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility class for testing online recovery of striped files. + */ +abstract public class ReadStripedFileWithDecodingHelper { + static final Logger LOG = + LoggerFactory.getLogger(ReadStripedFileWithDecodingHelper.class); + + static { + ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)) + .getLogger().setLevel(org.apache.log4j.Level.ALL); + GenericTestUtils.setLogLevel(BlockManager.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.DEBUG); + GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.DEBUG); + } + + protected static final ErasureCodingPolicy EC_POLICY = + StripedFileTestUtil.getDefaultECPolicy(); + protected static final short NUM_DATA_UNITS = + (short) EC_POLICY.getNumDataUnits(); + protected static final short NUM_PARITY_UNITS = + (short) EC_POLICY.getNumParityUnits(); + protected static final int CELL_SIZE = EC_POLICY.getCellSize(); + private static final int STRIPES_PER_BLOCK = 4; + protected static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK; + private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_UNITS; + + private static final int NUM_DATANODES = NUM_DATA_UNITS + NUM_PARITY_UNITS; + + protected static final int[] FILE_LENGTHS = + {BLOCK_GROUP_SIZE - 123, BLOCK_GROUP_SIZE + 123}; + + public static MiniDFSCluster initializeCluster() throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, + 0); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, + false); + conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, + StripedFileTestUtil.getDefaultECPolicy().getName()); + MiniDFSCluster myCluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(NUM_DATANODES) + .build(); + myCluster.getFileSystem().getClient().setErasureCodingPolicy("/", + StripedFileTestUtil.getDefaultECPolicy().getName()); + return myCluster; + } + + public static void tearDownCluster(MiniDFSCluster cluster) + throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + public static int findFirstDataNode(MiniDFSCluster cluster, + DistributedFileSystem dfs, Path file, long length) throws IOException { + BlockLocation[] locs = dfs.getFileBlockLocations(file, 0, length); + String name = (locs[0].getNames())[0]; + int dnIndex = 0; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + return dnIndex; + } + dnIndex++; + } + return -1; + } + + /** + * Cross product of FILE_LENGTHS, NUM_PARITY_UNITS+1, NUM_PARITY_UNITS. + * Input for parameterized tests classes. + * + * @return Test parameters. + */ + public static Collection getParameters() { + ArrayList params = new ArrayList<>(); + for (int fileLength : FILE_LENGTHS) { + for (int dataDelNum = 1; dataDelNum <= NUM_PARITY_UNITS; dataDelNum++) { + for (int parityDelNum = 0; + (dataDelNum + parityDelNum) <= NUM_PARITY_UNITS; parityDelNum++) { + params.add(new Object[] {fileLength, dataDelNum, parityDelNum}); + } + } + } + return params; + } + + public static void verifyRead(DistributedFileSystem dfs, Path testPath, + int length, byte[] expected) throws IOException { + LOG.info("verifyRead on path {}", testPath); + byte[] buffer = new byte[length + 100]; + LOG.info("verifyRead verifyLength on path {}", testPath); + StripedFileTestUtil.verifyLength(dfs, testPath, length); + LOG.info("verifyRead verifyPread on path {}", testPath); + StripedFileTestUtil.verifyPread(dfs, testPath, length, expected, buffer); + LOG.info("verifyRead verifyStatefulRead on path {}", testPath); + StripedFileTestUtil.verifyStatefulRead(dfs, testPath, length, expected, + buffer); + LOG.info("verifyRead verifyStatefulRead2 on path {}", testPath); + StripedFileTestUtil.verifyStatefulRead(dfs, testPath, length, expected, + ByteBuffer.allocate(length + 100)); + LOG.info("verifyRead verifySeek on path {}", testPath); + StripedFileTestUtil.verifySeek(dfs, testPath, length, EC_POLICY, + BLOCK_GROUP_SIZE); + } + + public static void testReadWithDNFailure(MiniDFSCluster cluster, + DistributedFileSystem dfs, int fileLength, int dnFailureNum) + throws Exception { + String fileType = fileLength < (BLOCK_SIZE * NUM_DATA_UNITS) ? + "smallFile" : "largeFile"; + String src = "/dnFailure_" + dnFailureNum + "_" + fileType; + LOG.info("testReadWithDNFailure: file = " + src + + ", fileSize = " + fileLength + + ", dnFailureNum = " + dnFailureNum); + + Path testPath = new Path(src); + final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength); + DFSTestUtil.writeFile(dfs, testPath, bytes); + StripedFileTestUtil.waitBlockGroupsReported(dfs, src); + + // shut down the DN that holds an internal data block + BlockLocation[] locs = dfs.getFileBlockLocations(testPath, CELL_SIZE * 5, + CELL_SIZE); + for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) { + String name = (locs[0].getNames())[failedDnIdx]; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + dn.shutdown(); + } + } + } + + // check file length, pread, stateful read and seek + verifyRead(dfs, testPath, fileLength, bytes); + } + + + /** + * Test reading a file with some blocks(data blocks or parity blocks or both) + * deleted or corrupted. + * @param src file path + * @param fileNumBytes file length + * @param dataBlkDelNum the deleted or corrupted number of data blocks. + * @param parityBlkDelNum the deleted or corrupted number of parity blocks. + * @param deleteBlockFile whether block file is deleted or corrupted. + * true is to delete the block file. + * false is to corrupt the content of the block file. + * @throws IOException + */ + public static void testReadWithBlockCorrupted(MiniDFSCluster cluster, + DistributedFileSystem dfs, String src, int fileNumBytes, + int dataBlkDelNum, int parityBlkDelNum, + boolean deleteBlockFile) throws IOException { + LOG.info("testReadWithBlockCorrupted: file = " + src + + ", dataBlkDelNum = " + dataBlkDelNum + + ", parityBlkDelNum = " + parityBlkDelNum + + ", deleteBlockFile? " + deleteBlockFile); + int recoverBlkNum = dataBlkDelNum + parityBlkDelNum; + Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive", + dataBlkDelNum >= 0 && parityBlkDelNum >= 0); + Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " + + "should be between 1 ~ " + NUM_PARITY_UNITS, recoverBlkNum <= + NUM_PARITY_UNITS); + + // write a file with the length of writeLen + Path srcPath = new Path(src); + final byte[] bytes = StripedFileTestUtil.generateBytes(fileNumBytes); + DFSTestUtil.writeFile(dfs, srcPath, bytes); + + // delete or corrupt some blocks + corruptBlocks(cluster, dfs, srcPath, dataBlkDelNum, parityBlkDelNum, + deleteBlockFile); + + // check the file can be read after some blocks were deleted + verifyRead(dfs, srcPath, fileNumBytes, bytes); + } + + public static void corruptBlocks(MiniDFSCluster cluster, + DistributedFileSystem dfs, Path srcPath, + int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile) + throws IOException { + LOG.info("corruptBlocks on path {}", srcPath); + int recoverBlkNum = dataBlkDelNum + parityBlkDelNum; + + LocatedBlocks locatedBlocks = getLocatedBlocks(dfs, srcPath); + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + + int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, NUM_DATA_UNITS, + dataBlkDelNum); + Assert.assertNotNull(delDataBlkIndices); + int[] delParityBlkIndices = StripedFileTestUtil.randomArray(NUM_DATA_UNITS, + NUM_DATA_UNITS + NUM_PARITY_UNITS, parityBlkDelNum); + Assert.assertNotNull(delParityBlkIndices); + + int[] delBlkIndices = new int[recoverBlkNum]; + System.arraycopy(delDataBlkIndices, 0, + delBlkIndices, 0, delDataBlkIndices.length); + System.arraycopy(delParityBlkIndices, 0, + delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length); + + ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum]; + for (int i = 0; i < recoverBlkNum; i++) { + delBlocks[i] = StripedBlockUtil + .constructInternalBlock(lastBlock.getBlock(), + CELL_SIZE, NUM_DATA_UNITS, delBlkIndices[i]); + if (deleteBlockFile) { + // delete the block file + LOG.info("Deleting block file {}", delBlocks[i]); + cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]); + } else { + // corrupt the block file + LOG.info("Corrupting block file {}", delBlocks[i]); + cluster.corruptBlockOnDataNodes(delBlocks[i]); + } + } + } + + public static LocatedBlocks getLocatedBlocks(DistributedFileSystem dfs, + Path filePath) throws IOException { + return dfs.getClient().getLocatedBlocks(filePath.toString(), + 0, Long.MAX_VALUE); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 057e94a09fa..1489e48195f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hdfs; import com.google.common.base.Joiner; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -27,12 +25,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream; import org.apache.hadoop.io.IOUtils; @@ -40,6 +38,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.EOFException; import java.io.IOException; @@ -57,7 +57,8 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; public class StripedFileTestUtil { - public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class); + public static final Logger LOG = + LoggerFactory.getLogger(StripedFileTestUtil.class); public static byte[] generateBytes(int cnt) { byte[] bytes = new byte[cnt]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDNFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDNFailure.java new file mode 100644 index 00000000000..40ac206dd89 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDNFailure.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.BLOCK_SIZE; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.FILE_LENGTHS; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster; + +/** + * Test online recovery with failed DNs. This test is parameterized. + */ +@RunWith(Parameterized.class) +public class TestReadStripedFileWithDNFailure { + static final Logger LOG = + LoggerFactory.getLogger(TestReadStripedFileWithDNFailure.class); + + private static MiniDFSCluster cluster; + private static DistributedFileSystem dfs; + + @Rule + public Timeout globalTimeout = new Timeout(300000); + + @BeforeClass + public static void setup() throws IOException { + cluster = initializeCluster(); + dfs = cluster.getFileSystem(); + } + + @AfterClass + public static void tearDown() throws IOException { + tearDownCluster(cluster); + } + + @Parameterized.Parameters + public static Collection getParameters() { + ArrayList params = new ArrayList<>(); + for (int fileLength : FILE_LENGTHS) { + for (int i = 0; i < NUM_PARITY_UNITS; i++) { + params.add(new Object[] {fileLength, i+1}); + } + } + return params; + } + + private int fileLength; + private int dnFailureNum; + + public TestReadStripedFileWithDNFailure(int fileLength, int dnFailureNum) { + this.fileLength = fileLength; + this.dnFailureNum = dnFailureNum; + } + + /** + * Shutdown tolerable number of Datanode before reading. + * Verify the decoding works correctly. + */ + @Test + public void testReadWithDNFailure() throws Exception { + try { + // setup a new cluster with no dead datanode + setup(); + ReadStripedFileWithDecodingHelper.testReadWithDNFailure(cluster, + dfs, fileLength, dnFailureNum); + } catch (IOException ioe) { + String fileType = fileLength < (BLOCK_SIZE * NUM_DATA_UNITS) ? + "smallFile" : "largeFile"; + LOG.error("Failed to read file with DN failure:" + + " fileType = " + fileType + + ", dnFailureNum = " + dnFailureNum); + } finally { + // tear down the cluster + tearDown(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java index cb1640c0b6a..2fb9212f354 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java @@ -17,222 +17,58 @@ */ package org.apache.hadoop.hdfs; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.util.StripedBlockUtil; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.log4j.Level; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -public class TestReadStripedFileWithDecoding { - static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class); +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.CELL_SIZE; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findFirstDataNode; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster; - static { - ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)) - .getLogger().setLevel(Level.ALL); - GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL); - GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL); - GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL); - } +public class TestReadStripedFileWithDecoding { + private static final Logger LOG = + LoggerFactory.getLogger(TestReadStripedFileWithDecoding.class); private MiniDFSCluster cluster; - private DistributedFileSystem fs; - private final ErasureCodingPolicy ecPolicy = - StripedFileTestUtil.getDefaultECPolicy(); - private final short dataBlocks = (short) ecPolicy.getNumDataUnits(); - private final short parityBlocks = - (short) ecPolicy.getNumParityUnits(); - private final int numDNs = dataBlocks + parityBlocks; - private final int cellSize = ecPolicy.getCellSize(); - private final int stripPerBlock = 4; - private final int blockSize = cellSize * stripPerBlock; - private final int blockGroupSize = blockSize * dataBlocks; - private final int smallFileLength = blockGroupSize - 123; - private final int largeFileLength = blockGroupSize + 123; - private final int[] fileLengths = {smallFileLength, largeFileLength}; - private final int[] dnFailureNums = getDnFailureNums(); - - private int[] getDnFailureNums() { - int[] dnFailureNums = new int[parityBlocks]; - for (int i = 0; i < dnFailureNums.length; i++) { - dnFailureNums[i] = i + 1; - } - return dnFailureNums; - } + private DistributedFileSystem dfs; @Rule public Timeout globalTimeout = new Timeout(300000); @Before public void setup() throws IOException { - Configuration conf = new HdfsConfiguration(); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 0); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, - false); - conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, - StripedFileTestUtil.getDefaultECPolicy().getName()); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); - cluster.getFileSystem().getClient().setErasureCodingPolicy("/", - StripedFileTestUtil.getDefaultECPolicy().getName()); - fs = cluster.getFileSystem(); + cluster = initializeCluster(); + dfs = cluster.getFileSystem(); } @After public void tearDown() throws IOException { - if (cluster != null) { - cluster.shutdown(); - cluster = null; - } - } - - /** - * Shutdown tolerable number of Datanode before reading. - * Verify the decoding works correctly. - */ - @Test(timeout=300000) - public void testReadWithDNFailure() throws Exception { - for (int fileLength : fileLengths) { - for (int dnFailureNum : dnFailureNums) { - try { - // setup a new cluster with no dead datanode - setup(); - testReadWithDNFailure(fileLength, dnFailureNum); - } catch (IOException ioe) { - String fileType = fileLength < (blockSize * dataBlocks) ? - "smallFile" : "largeFile"; - LOG.error("Failed to read file with DN failure:" - + " fileType = "+ fileType - + ", dnFailureNum = " + dnFailureNum); - } finally { - // tear down the cluster - tearDown(); - } - } - } - } - - /** - * Corrupt tolerable number of block before reading. - * Verify the decoding works correctly. - */ - @Test(timeout=300000) - public void testReadCorruptedData() throws IOException { - for (int fileLength : fileLengths) { - for (int dataDelNum = 1; dataDelNum <= parityBlocks; dataDelNum++) { - for (int parityDelNum = 0; (dataDelNum + parityDelNum) <= parityBlocks; - parityDelNum++) { - String src = "/corrupted_" + dataDelNum + "_" + parityDelNum; - testReadWithBlockCorrupted(src, fileLength, - dataDelNum, parityDelNum, false); - } - } - } - } - - /** - * Delete tolerable number of block before reading. - * Verify the decoding works correctly. - */ - @Test(timeout=300000) - public void testReadCorruptedDataByDeleting() throws IOException { - for (int fileLength : fileLengths) { - for (int dataDelNum = 1; dataDelNum <= parityBlocks; dataDelNum++) { - for (int parityDelNum = 0; (dataDelNum + parityDelNum) <= parityBlocks; - parityDelNum++) { - String src = "/deleted_" + dataDelNum + "_" + parityDelNum; - testReadWithBlockCorrupted(src, fileLength, - dataDelNum, parityDelNum, true); - } - } - } - } - - private int findFirstDataNode(Path file, long length) throws IOException { - BlockLocation[] locs = fs.getFileBlockLocations(file, 0, length); - String name = (locs[0].getNames())[0]; - int dnIndex = 0; - for (DataNode dn : cluster.getDataNodes()) { - int port = dn.getXferPort(); - if (name.contains(Integer.toString(port))) { - return dnIndex; - } - dnIndex++; - } - return -1; - } - - private void verifyRead(Path testPath, int length, byte[] expected) - throws IOException { - byte[] buffer = new byte[length + 100]; - StripedFileTestUtil.verifyLength(fs, testPath, length); - StripedFileTestUtil.verifyPread(fs, testPath, length, expected, buffer); - StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, buffer); - StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, - ByteBuffer.allocate(length + 100)); - StripedFileTestUtil.verifySeek(fs, testPath, length, ecPolicy, - blockGroupSize); - } - - private void testReadWithDNFailure(int fileLength, int dnFailureNum) - throws Exception { - String fileType = fileLength < (blockSize * dataBlocks) ? - "smallFile" : "largeFile"; - String src = "/dnFailure_" + dnFailureNum + "_" + fileType; - LOG.info("testReadWithDNFailure: file = " + src - + ", fileSize = " + fileLength - + ", dnFailureNum = " + dnFailureNum); - - Path testPath = new Path(src); - final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength); - DFSTestUtil.writeFile(fs, testPath, bytes); - StripedFileTestUtil.waitBlockGroupsReported(fs, src); - - // shut down the DN that holds an internal data block - BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5, - cellSize); - for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) { - String name = (locs[0].getNames())[failedDnIdx]; - for (DataNode dn : cluster.getDataNodes()) { - int port = dn.getXferPort(); - if (name.contains(Integer.toString(port))) { - dn.shutdown(); - } - } - } - - // check file length, pread, stateful read and seek - verifyRead(testPath, fileLength, bytes); + tearDownCluster(cluster); } /** @@ -245,15 +81,17 @@ public class TestReadStripedFileWithDecoding { final Path file = new Path("/corrupted"); final int length = 10; // length of "corruption" final byte[] bytes = StripedFileTestUtil.generateBytes(length); - DFSTestUtil.writeFile(fs, file, bytes); + DFSTestUtil.writeFile(dfs, file, bytes); // corrupt the first data block - int dnIndex = findFirstDataNode(file, cellSize * dataBlocks); + int dnIndex = ReadStripedFileWithDecodingHelper.findFirstDataNode( + cluster, dfs, file, CELL_SIZE * NUM_DATA_UNITS); Assert.assertNotEquals(-1, dnIndex); - LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient() - .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0); + LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient() + .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS) + .get(0); final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, - cellSize, dataBlocks, parityBlocks); + CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS); // find the first block file File storageDir = cluster.getInstanceStorageDir(dnIndex, 0); File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock()); @@ -272,7 +110,7 @@ public class TestReadStripedFileWithDecoding { try { // do stateful read - StripedFileTestUtil.verifyStatefulRead(fs, file, length, bytes, + StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes, ByteBuffer.allocate(1024)); // check whether the corruption has been reported to the NameNode @@ -293,110 +131,35 @@ public class TestReadStripedFileWithDecoding { final Path file = new Path("/invalidate"); final int length = 10; final byte[] bytes = StripedFileTestUtil.generateBytes(length); - DFSTestUtil.writeFile(fs, file, bytes); + DFSTestUtil.writeFile(dfs, file, bytes); - int dnIndex = findFirstDataNode(file, cellSize * dataBlocks); + int dnIndex = findFirstDataNode(cluster, dfs, file, + CELL_SIZE * NUM_DATA_UNITS); Assert.assertNotEquals(-1, dnIndex); - LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient() - .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0); + LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient() + .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS) + .get(0); final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb, - cellSize, dataBlocks, parityBlocks); + CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS); final Block b = blks[0].getBlock().getLocalBlock(); DataNode dn = cluster.getDataNodes().get(dnIndex); - // disable the heartbeat from DN so that the invalidated block record is kept - // in NameNode until heartbeat expires and NN mark the dn as dead + // disable the heartbeat from DN so that the invalidated block record is + // kept in NameNode until heartbeat expires and NN mark the dn as dead DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); try { // delete the file - fs.delete(file, true); + dfs.delete(file, true); // check the block is added to invalidateBlocks final FSNamesystem fsn = cluster.getNamesystem(); final BlockManager bm = fsn.getBlockManager(); - DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId()); + DatanodeDescriptor dnd = + NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId()); Assert.assertTrue(bm.containsInvalidateBlock( blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b)); } finally { DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); } } - - /** - * Test reading a file with some blocks(data blocks or parity blocks or both) - * deleted or corrupted. - * @param src file path - * @param fileLength file length - * @param dataBlkDelNum the deleted or corrupted number of data blocks. - * @param parityBlkDelNum the deleted or corrupted number of parity blocks. - * @param deleteBlockFile whether block file is deleted or corrupted. - * true is to delete the block file. - * false is to corrupt the content of the block file. - * @throws IOException - */ - private void testReadWithBlockCorrupted(String src, int fileLength, - int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile) - throws IOException { - LOG.info("testReadWithBlockCorrupted: file = " + src - + ", dataBlkDelNum = " + dataBlkDelNum - + ", parityBlkDelNum = " + parityBlkDelNum - + ", deleteBlockFile? " + deleteBlockFile); - int recoverBlkNum = dataBlkDelNum + parityBlkDelNum; - Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive", - dataBlkDelNum >= 0 && parityBlkDelNum >= 0); - Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " + - "should be between 1 ~ " + parityBlocks, recoverBlkNum <= parityBlocks); - - // write a file with the length of writeLen - Path srcPath = new Path(src); - final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength); - DFSTestUtil.writeFile(fs, srcPath, bytes); - - // delete or corrupt some blocks - corruptBlocks(srcPath, dataBlkDelNum, parityBlkDelNum, deleteBlockFile); - - // check the file can be read after some blocks were deleted - verifyRead(srcPath, fileLength, bytes); - } - - private void corruptBlocks(Path srcPath, int dataBlkDelNum, - int parityBlkDelNum, boolean deleteBlockFile) throws IOException { - int recoverBlkNum = dataBlkDelNum + parityBlkDelNum; - - LocatedBlocks locatedBlocks = getLocatedBlocks(srcPath); - LocatedStripedBlock lastBlock = - (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); - - int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, dataBlocks, - dataBlkDelNum); - Assert.assertNotNull(delDataBlkIndices); - int[] delParityBlkIndices = StripedFileTestUtil.randomArray(dataBlocks, - dataBlocks + parityBlocks, parityBlkDelNum); - Assert.assertNotNull(delParityBlkIndices); - - int[] delBlkIndices = new int[recoverBlkNum]; - System.arraycopy(delDataBlkIndices, 0, - delBlkIndices, 0, delDataBlkIndices.length); - System.arraycopy(delParityBlkIndices, 0, - delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length); - - ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum]; - for (int i = 0; i < recoverBlkNum; i++) { - delBlocks[i] = StripedBlockUtil - .constructInternalBlock(lastBlock.getBlock(), - cellSize, dataBlocks, delBlkIndices[i]); - if (deleteBlockFile) { - // delete the block file - cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]); - } else { - // corrupt the block file - cluster.corruptBlockOnDataNodes(delBlocks[i]); - } - } - } - - private LocatedBlocks getLocatedBlocks(Path filePath) throws IOException { - return fs.getClient().getLocatedBlocks(filePath.toString(), - 0, Long.MAX_VALUE); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingCorruptData.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingCorruptData.java new file mode 100644 index 00000000000..5a8fb4f0ee5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingCorruptData.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; + +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster; + +/** + * Test online recovery with corrupt files. This test is parameterized. + */ +@RunWith(Parameterized.class) +public class TestReadStripedFileWithDecodingCorruptData { + static final Logger LOG = + LoggerFactory.getLogger(TestReadStripedFileWithDecodingCorruptData.class); + + private static MiniDFSCluster cluster; + private static DistributedFileSystem dfs; + + @Rule + public Timeout globalTimeout = new Timeout(300000); + + @BeforeClass + public static void setup() throws IOException { + cluster = initializeCluster(); + dfs = cluster.getFileSystem(); + } + + @AfterClass + public static void tearDown() throws IOException { + tearDownCluster(cluster); + } + + @Parameterized.Parameters + public static Collection getParameters() { + return ReadStripedFileWithDecodingHelper.getParameters(); + } + + private int fileLength; + private int dataDelNum; + private int parityDelNum; + + public TestReadStripedFileWithDecodingCorruptData(int fileLength, int + dataDelNum, int parityDelNum) { + this.fileLength = fileLength; + this.dataDelNum = dataDelNum; + this.parityDelNum = parityDelNum; + } + + /** + * Corrupt tolerable number of block before reading. + * Verify the decoding works correctly. + */ + @Test + public void testReadCorruptedData() throws IOException { + String src = "/corrupted_" + dataDelNum + "_" + parityDelNum; + ReadStripedFileWithDecodingHelper.testReadWithBlockCorrupted(cluster, + dfs, src, fileLength, dataDelNum, parityDelNum, false); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingDeletedData.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingDeletedData.java new file mode 100644 index 00000000000..c267e8417be --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingDeletedData.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; + +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster; +import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster; + +/** + * Test online recovery with files with deleted blocks. This test is + * parameterized. + */ +@RunWith(Parameterized.class) +public class TestReadStripedFileWithDecodingDeletedData { + static final Logger LOG = + LoggerFactory.getLogger(TestReadStripedFileWithDecodingDeletedData.class); + + private static MiniDFSCluster cluster; + private static DistributedFileSystem dfs; + + @Rule + public Timeout globalTimeout = new Timeout(300000); + + @BeforeClass + public static void setup() throws IOException { + cluster = initializeCluster(); + dfs = cluster.getFileSystem(); + } + + @AfterClass + public static void tearDown() throws IOException { + tearDownCluster(cluster); + } + + @Parameterized.Parameters + public static Collection getParameters() { + return ReadStripedFileWithDecodingHelper.getParameters(); + } + + private int fileLength; + private int dataDelNum; + private int parityDelNum; + + public TestReadStripedFileWithDecodingDeletedData(int fileLength, int + dataDelNum, int parityDelNum) { + this.fileLength = fileLength; + this.dataDelNum = dataDelNum; + this.parityDelNum = parityDelNum; + } + + /** + * Delete tolerable number of block before reading. + * Verify the decoding works correctly. + */ + @Test + public void testReadCorruptedDataByDeleting() throws IOException { + String src = "/deleted_" + dataDelNum + "_" + parityDelNum; + ReadStripedFileWithDecodingHelper.testReadWithBlockCorrupted(cluster, + dfs, src, fileLength, dataDelNum, parityDelNum, true); + } +}