HDFS-12377. Refactor TestReadStripedFileWithDecoding to avoid test timeouts.

This commit is contained in:
Andrew Wang 2017-09-05 16:33:29 -07:00
parent ad32759fd9
commit d4035d42f0
6 changed files with 596 additions and 277 deletions

View File

@ -0,0 +1,273 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
/**
* Utility class for testing online recovery of striped files.
*/
abstract public class ReadStripedFileWithDecodingHelper {
static final Logger LOG =
LoggerFactory.getLogger(ReadStripedFileWithDecodingHelper.class);
static {
((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
.getLogger().setLevel(org.apache.log4j.Level.ALL);
GenericTestUtils.setLogLevel(BlockManager.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.DEBUG);
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.DEBUG);
}
protected static final ErasureCodingPolicy EC_POLICY =
StripedFileTestUtil.getDefaultECPolicy();
protected static final short NUM_DATA_UNITS =
(short) EC_POLICY.getNumDataUnits();
protected static final short NUM_PARITY_UNITS =
(short) EC_POLICY.getNumParityUnits();
protected static final int CELL_SIZE = EC_POLICY.getCellSize();
private static final int STRIPES_PER_BLOCK = 4;
protected static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_UNITS;
private static final int NUM_DATANODES = NUM_DATA_UNITS + NUM_PARITY_UNITS;
protected static final int[] FILE_LENGTHS =
{BLOCK_GROUP_SIZE - 123, BLOCK_GROUP_SIZE + 123};
public static MiniDFSCluster initializeCluster() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
0);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
false);
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
StripedFileTestUtil.getDefaultECPolicy().getName());
MiniDFSCluster myCluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATANODES)
.build();
myCluster.getFileSystem().getClient().setErasureCodingPolicy("/",
StripedFileTestUtil.getDefaultECPolicy().getName());
return myCluster;
}
public static void tearDownCluster(MiniDFSCluster cluster)
throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
public static int findFirstDataNode(MiniDFSCluster cluster,
DistributedFileSystem dfs, Path file, long length) throws IOException {
BlockLocation[] locs = dfs.getFileBlockLocations(file, 0, length);
String name = (locs[0].getNames())[0];
int dnIndex = 0;
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
return dnIndex;
}
dnIndex++;
}
return -1;
}
/**
* Cross product of FILE_LENGTHS, NUM_PARITY_UNITS+1, NUM_PARITY_UNITS.
* Input for parameterized tests classes.
*
* @return Test parameters.
*/
public static Collection<Object[]> getParameters() {
ArrayList<Object[]> params = new ArrayList<>();
for (int fileLength : FILE_LENGTHS) {
for (int dataDelNum = 1; dataDelNum <= NUM_PARITY_UNITS; dataDelNum++) {
for (int parityDelNum = 0;
(dataDelNum + parityDelNum) <= NUM_PARITY_UNITS; parityDelNum++) {
params.add(new Object[] {fileLength, dataDelNum, parityDelNum});
}
}
}
return params;
}
public static void verifyRead(DistributedFileSystem dfs, Path testPath,
int length, byte[] expected) throws IOException {
LOG.info("verifyRead on path {}", testPath);
byte[] buffer = new byte[length + 100];
LOG.info("verifyRead verifyLength on path {}", testPath);
StripedFileTestUtil.verifyLength(dfs, testPath, length);
LOG.info("verifyRead verifyPread on path {}", testPath);
StripedFileTestUtil.verifyPread(dfs, testPath, length, expected, buffer);
LOG.info("verifyRead verifyStatefulRead on path {}", testPath);
StripedFileTestUtil.verifyStatefulRead(dfs, testPath, length, expected,
buffer);
LOG.info("verifyRead verifyStatefulRead2 on path {}", testPath);
StripedFileTestUtil.verifyStatefulRead(dfs, testPath, length, expected,
ByteBuffer.allocate(length + 100));
LOG.info("verifyRead verifySeek on path {}", testPath);
StripedFileTestUtil.verifySeek(dfs, testPath, length, EC_POLICY,
BLOCK_GROUP_SIZE);
}
public static void testReadWithDNFailure(MiniDFSCluster cluster,
DistributedFileSystem dfs, int fileLength, int dnFailureNum)
throws Exception {
String fileType = fileLength < (BLOCK_SIZE * NUM_DATA_UNITS) ?
"smallFile" : "largeFile";
String src = "/dnFailure_" + dnFailureNum + "_" + fileType;
LOG.info("testReadWithDNFailure: file = " + src
+ ", fileSize = " + fileLength
+ ", dnFailureNum = " + dnFailureNum);
Path testPath = new Path(src);
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(dfs, testPath, bytes);
StripedFileTestUtil.waitBlockGroupsReported(dfs, src);
// shut down the DN that holds an internal data block
BlockLocation[] locs = dfs.getFileBlockLocations(testPath, CELL_SIZE * 5,
CELL_SIZE);
for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) {
String name = (locs[0].getNames())[failedDnIdx];
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
dn.shutdown();
}
}
}
// check file length, pread, stateful read and seek
verifyRead(dfs, testPath, fileLength, bytes);
}
/**
* Test reading a file with some blocks(data blocks or parity blocks or both)
* deleted or corrupted.
* @param src file path
* @param fileNumBytes file length
* @param dataBlkDelNum the deleted or corrupted number of data blocks.
* @param parityBlkDelNum the deleted or corrupted number of parity blocks.
* @param deleteBlockFile whether block file is deleted or corrupted.
* true is to delete the block file.
* false is to corrupt the content of the block file.
* @throws IOException
*/
public static void testReadWithBlockCorrupted(MiniDFSCluster cluster,
DistributedFileSystem dfs, String src, int fileNumBytes,
int dataBlkDelNum, int parityBlkDelNum,
boolean deleteBlockFile) throws IOException {
LOG.info("testReadWithBlockCorrupted: file = " + src
+ ", dataBlkDelNum = " + dataBlkDelNum
+ ", parityBlkDelNum = " + parityBlkDelNum
+ ", deleteBlockFile? " + deleteBlockFile);
int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive",
dataBlkDelNum >= 0 && parityBlkDelNum >= 0);
Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " +
"should be between 1 ~ " + NUM_PARITY_UNITS, recoverBlkNum <=
NUM_PARITY_UNITS);
// write a file with the length of writeLen
Path srcPath = new Path(src);
final byte[] bytes = StripedFileTestUtil.generateBytes(fileNumBytes);
DFSTestUtil.writeFile(dfs, srcPath, bytes);
// delete or corrupt some blocks
corruptBlocks(cluster, dfs, srcPath, dataBlkDelNum, parityBlkDelNum,
deleteBlockFile);
// check the file can be read after some blocks were deleted
verifyRead(dfs, srcPath, fileNumBytes, bytes);
}
public static void corruptBlocks(MiniDFSCluster cluster,
DistributedFileSystem dfs, Path srcPath,
int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile)
throws IOException {
LOG.info("corruptBlocks on path {}", srcPath);
int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
LocatedBlocks locatedBlocks = getLocatedBlocks(dfs, srcPath);
LocatedStripedBlock lastBlock =
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, NUM_DATA_UNITS,
dataBlkDelNum);
Assert.assertNotNull(delDataBlkIndices);
int[] delParityBlkIndices = StripedFileTestUtil.randomArray(NUM_DATA_UNITS,
NUM_DATA_UNITS + NUM_PARITY_UNITS, parityBlkDelNum);
Assert.assertNotNull(delParityBlkIndices);
int[] delBlkIndices = new int[recoverBlkNum];
System.arraycopy(delDataBlkIndices, 0,
delBlkIndices, 0, delDataBlkIndices.length);
System.arraycopy(delParityBlkIndices, 0,
delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length);
ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum];
for (int i = 0; i < recoverBlkNum; i++) {
delBlocks[i] = StripedBlockUtil
.constructInternalBlock(lastBlock.getBlock(),
CELL_SIZE, NUM_DATA_UNITS, delBlkIndices[i]);
if (deleteBlockFile) {
// delete the block file
LOG.info("Deleting block file {}", delBlocks[i]);
cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]);
} else {
// corrupt the block file
LOG.info("Corrupting block file {}", delBlocks[i]);
cluster.corruptBlockOnDataNodes(delBlocks[i]);
}
}
}
public static LocatedBlocks getLocatedBlocks(DistributedFileSystem dfs,
Path filePath) throws IOException {
return dfs.getClient().getLocatedBlocks(filePath.toString(),
0, Long.MAX_VALUE);
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@ -27,12 +25,12 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
import org.apache.hadoop.io.IOUtils;
@ -40,6 +38,8 @@
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
@ -57,7 +57,8 @@
import static org.junit.Assert.assertEquals;
public class StripedFileTestUtil {
public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class);
public static final Logger LOG =
LoggerFactory.getLogger(StripedFileTestUtil.class);
public static byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.BLOCK_SIZE;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.FILE_LENGTHS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
/**
* Test online recovery with failed DNs. This test is parameterized.
*/
@RunWith(Parameterized.class)
public class TestReadStripedFileWithDNFailure {
static final Logger LOG =
LoggerFactory.getLogger(TestReadStripedFileWithDNFailure.class);
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
@Rule
public Timeout globalTimeout = new Timeout(300000);
@BeforeClass
public static void setup() throws IOException {
cluster = initializeCluster();
dfs = cluster.getFileSystem();
}
@AfterClass
public static void tearDown() throws IOException {
tearDownCluster(cluster);
}
@Parameterized.Parameters
public static Collection<Object[]> getParameters() {
ArrayList<Object[]> params = new ArrayList<>();
for (int fileLength : FILE_LENGTHS) {
for (int i = 0; i < NUM_PARITY_UNITS; i++) {
params.add(new Object[] {fileLength, i+1});
}
}
return params;
}
private int fileLength;
private int dnFailureNum;
public TestReadStripedFileWithDNFailure(int fileLength, int dnFailureNum) {
this.fileLength = fileLength;
this.dnFailureNum = dnFailureNum;
}
/**
* Shutdown tolerable number of Datanode before reading.
* Verify the decoding works correctly.
*/
@Test
public void testReadWithDNFailure() throws Exception {
try {
// setup a new cluster with no dead datanode
setup();
ReadStripedFileWithDecodingHelper.testReadWithDNFailure(cluster,
dfs, fileLength, dnFailureNum);
} catch (IOException ioe) {
String fileType = fileLength < (BLOCK_SIZE * NUM_DATA_UNITS) ?
"smallFile" : "largeFile";
LOG.error("Failed to read file with DN failure:"
+ " fileType = " + fileType
+ ", dnFailureNum = " + dnFailureNum);
} finally {
// tear down the cluster
tearDown();
}
}
}

View File

@ -17,222 +17,58 @@
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
public class TestReadStripedFileWithDecoding {
static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.CELL_SIZE;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findFirstDataNode;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
static {
((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
.getLogger().setLevel(Level.ALL);
GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
}
public class TestReadStripedFileWithDecoding {
private static final Logger LOG =
LoggerFactory.getLogger(TestReadStripedFileWithDecoding.class);
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private final ErasureCodingPolicy ecPolicy =
StripedFileTestUtil.getDefaultECPolicy();
private final short dataBlocks = (short) ecPolicy.getNumDataUnits();
private final short parityBlocks =
(short) ecPolicy.getNumParityUnits();
private final int numDNs = dataBlocks + parityBlocks;
private final int cellSize = ecPolicy.getCellSize();
private final int stripPerBlock = 4;
private final int blockSize = cellSize * stripPerBlock;
private final int blockGroupSize = blockSize * dataBlocks;
private final int smallFileLength = blockGroupSize - 123;
private final int largeFileLength = blockGroupSize + 123;
private final int[] fileLengths = {smallFileLength, largeFileLength};
private final int[] dnFailureNums = getDnFailureNums();
private int[] getDnFailureNums() {
int[] dnFailureNums = new int[parityBlocks];
for (int i = 0; i < dnFailureNums.length; i++) {
dnFailureNums[i] = i + 1;
}
return dnFailureNums;
}
private DistributedFileSystem dfs;
@Rule
public Timeout globalTimeout = new Timeout(300000);
@Before
public void setup() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 0);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
false);
conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
StripedFileTestUtil.getDefaultECPolicy().getName());
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().setErasureCodingPolicy("/",
StripedFileTestUtil.getDefaultECPolicy().getName());
fs = cluster.getFileSystem();
cluster = initializeCluster();
dfs = cluster.getFileSystem();
}
@After
public void tearDown() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
/**
* Shutdown tolerable number of Datanode before reading.
* Verify the decoding works correctly.
*/
@Test(timeout=300000)
public void testReadWithDNFailure() throws Exception {
for (int fileLength : fileLengths) {
for (int dnFailureNum : dnFailureNums) {
try {
// setup a new cluster with no dead datanode
setup();
testReadWithDNFailure(fileLength, dnFailureNum);
} catch (IOException ioe) {
String fileType = fileLength < (blockSize * dataBlocks) ?
"smallFile" : "largeFile";
LOG.error("Failed to read file with DN failure:"
+ " fileType = "+ fileType
+ ", dnFailureNum = " + dnFailureNum);
} finally {
// tear down the cluster
tearDown();
}
}
}
}
/**
* Corrupt tolerable number of block before reading.
* Verify the decoding works correctly.
*/
@Test(timeout=300000)
public void testReadCorruptedData() throws IOException {
for (int fileLength : fileLengths) {
for (int dataDelNum = 1; dataDelNum <= parityBlocks; dataDelNum++) {
for (int parityDelNum = 0; (dataDelNum + parityDelNum) <= parityBlocks;
parityDelNum++) {
String src = "/corrupted_" + dataDelNum + "_" + parityDelNum;
testReadWithBlockCorrupted(src, fileLength,
dataDelNum, parityDelNum, false);
}
}
}
}
/**
* Delete tolerable number of block before reading.
* Verify the decoding works correctly.
*/
@Test(timeout=300000)
public void testReadCorruptedDataByDeleting() throws IOException {
for (int fileLength : fileLengths) {
for (int dataDelNum = 1; dataDelNum <= parityBlocks; dataDelNum++) {
for (int parityDelNum = 0; (dataDelNum + parityDelNum) <= parityBlocks;
parityDelNum++) {
String src = "/deleted_" + dataDelNum + "_" + parityDelNum;
testReadWithBlockCorrupted(src, fileLength,
dataDelNum, parityDelNum, true);
}
}
}
}
private int findFirstDataNode(Path file, long length) throws IOException {
BlockLocation[] locs = fs.getFileBlockLocations(file, 0, length);
String name = (locs[0].getNames())[0];
int dnIndex = 0;
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
return dnIndex;
}
dnIndex++;
}
return -1;
}
private void verifyRead(Path testPath, int length, byte[] expected)
throws IOException {
byte[] buffer = new byte[length + 100];
StripedFileTestUtil.verifyLength(fs, testPath, length);
StripedFileTestUtil.verifyPread(fs, testPath, length, expected, buffer);
StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, buffer);
StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected,
ByteBuffer.allocate(length + 100));
StripedFileTestUtil.verifySeek(fs, testPath, length, ecPolicy,
blockGroupSize);
}
private void testReadWithDNFailure(int fileLength, int dnFailureNum)
throws Exception {
String fileType = fileLength < (blockSize * dataBlocks) ?
"smallFile" : "largeFile";
String src = "/dnFailure_" + dnFailureNum + "_" + fileType;
LOG.info("testReadWithDNFailure: file = " + src
+ ", fileSize = " + fileLength
+ ", dnFailureNum = " + dnFailureNum);
Path testPath = new Path(src);
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(fs, testPath, bytes);
StripedFileTestUtil.waitBlockGroupsReported(fs, src);
// shut down the DN that holds an internal data block
BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
cellSize);
for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) {
String name = (locs[0].getNames())[failedDnIdx];
for (DataNode dn : cluster.getDataNodes()) {
int port = dn.getXferPort();
if (name.contains(Integer.toString(port))) {
dn.shutdown();
}
}
}
// check file length, pread, stateful read and seek
verifyRead(testPath, fileLength, bytes);
tearDownCluster(cluster);
}
/**
@ -245,15 +81,17 @@ public void testReportBadBlock() throws IOException {
final Path file = new Path("/corrupted");
final int length = 10; // length of "corruption"
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
DFSTestUtil.writeFile(dfs, file, bytes);
// corrupt the first data block
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
int dnIndex = ReadStripedFileWithDecodingHelper.findFirstDataNode(
cluster, dfs, file, CELL_SIZE * NUM_DATA_UNITS);
Assert.assertNotEquals(-1, dnIndex);
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
.getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
.get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
cellSize, dataBlocks, parityBlocks);
CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
// find the first block file
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
@ -272,7 +110,7 @@ public void testReportBadBlock() throws IOException {
try {
// do stateful read
StripedFileTestUtil.verifyStatefulRead(fs, file, length, bytes,
StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
ByteBuffer.allocate(1024));
// check whether the corruption has been reported to the NameNode
@ -293,110 +131,35 @@ public void testInvalidateBlock() throws IOException {
final Path file = new Path("/invalidate");
final int length = 10;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);
DFSTestUtil.writeFile(dfs, file, bytes);
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
int dnIndex = findFirstDataNode(cluster, dfs, file,
CELL_SIZE * NUM_DATA_UNITS);
Assert.assertNotEquals(-1, dnIndex);
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
.getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
.get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
cellSize, dataBlocks, parityBlocks);
CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
final Block b = blks[0].getBlock().getLocalBlock();
DataNode dn = cluster.getDataNodes().get(dnIndex);
// disable the heartbeat from DN so that the invalidated block record is kept
// in NameNode until heartbeat expires and NN mark the dn as dead
// disable the heartbeat from DN so that the invalidated block record is
// kept in NameNode until heartbeat expires and NN mark the dn as dead
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
try {
// delete the file
fs.delete(file, true);
dfs.delete(file, true);
// check the block is added to invalidateBlocks
final FSNamesystem fsn = cluster.getNamesystem();
final BlockManager bm = fsn.getBlockManager();
DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
DatanodeDescriptor dnd =
NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
Assert.assertTrue(bm.containsInvalidateBlock(
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
} finally {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
}
}
/**
* Test reading a file with some blocks(data blocks or parity blocks or both)
* deleted or corrupted.
* @param src file path
* @param fileLength file length
* @param dataBlkDelNum the deleted or corrupted number of data blocks.
* @param parityBlkDelNum the deleted or corrupted number of parity blocks.
* @param deleteBlockFile whether block file is deleted or corrupted.
* true is to delete the block file.
* false is to corrupt the content of the block file.
* @throws IOException
*/
private void testReadWithBlockCorrupted(String src, int fileLength,
int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile)
throws IOException {
LOG.info("testReadWithBlockCorrupted: file = " + src
+ ", dataBlkDelNum = " + dataBlkDelNum
+ ", parityBlkDelNum = " + parityBlkDelNum
+ ", deleteBlockFile? " + deleteBlockFile);
int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive",
dataBlkDelNum >= 0 && parityBlkDelNum >= 0);
Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " +
"should be between 1 ~ " + parityBlocks, recoverBlkNum <= parityBlocks);
// write a file with the length of writeLen
Path srcPath = new Path(src);
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(fs, srcPath, bytes);
// delete or corrupt some blocks
corruptBlocks(srcPath, dataBlkDelNum, parityBlkDelNum, deleteBlockFile);
// check the file can be read after some blocks were deleted
verifyRead(srcPath, fileLength, bytes);
}
private void corruptBlocks(Path srcPath, int dataBlkDelNum,
int parityBlkDelNum, boolean deleteBlockFile) throws IOException {
int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
LocatedBlocks locatedBlocks = getLocatedBlocks(srcPath);
LocatedStripedBlock lastBlock =
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
dataBlkDelNum);
Assert.assertNotNull(delDataBlkIndices);
int[] delParityBlkIndices = StripedFileTestUtil.randomArray(dataBlocks,
dataBlocks + parityBlocks, parityBlkDelNum);
Assert.assertNotNull(delParityBlkIndices);
int[] delBlkIndices = new int[recoverBlkNum];
System.arraycopy(delDataBlkIndices, 0,
delBlkIndices, 0, delDataBlkIndices.length);
System.arraycopy(delParityBlkIndices, 0,
delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length);
ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum];
for (int i = 0; i < recoverBlkNum; i++) {
delBlocks[i] = StripedBlockUtil
.constructInternalBlock(lastBlock.getBlock(),
cellSize, dataBlocks, delBlkIndices[i]);
if (deleteBlockFile) {
// delete the block file
cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]);
} else {
// corrupt the block file
cluster.corruptBlockOnDataNodes(delBlocks[i]);
}
}
}
private LocatedBlocks getLocatedBlocks(Path filePath) throws IOException {
return fs.getClient().getLocatedBlocks(filePath.toString(),
0, Long.MAX_VALUE);
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
/**
* Test online recovery with corrupt files. This test is parameterized.
*/
@RunWith(Parameterized.class)
public class TestReadStripedFileWithDecodingCorruptData {
static final Logger LOG =
LoggerFactory.getLogger(TestReadStripedFileWithDecodingCorruptData.class);
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
@Rule
public Timeout globalTimeout = new Timeout(300000);
@BeforeClass
public static void setup() throws IOException {
cluster = initializeCluster();
dfs = cluster.getFileSystem();
}
@AfterClass
public static void tearDown() throws IOException {
tearDownCluster(cluster);
}
@Parameterized.Parameters
public static Collection<Object[]> getParameters() {
return ReadStripedFileWithDecodingHelper.getParameters();
}
private int fileLength;
private int dataDelNum;
private int parityDelNum;
public TestReadStripedFileWithDecodingCorruptData(int fileLength, int
dataDelNum, int parityDelNum) {
this.fileLength = fileLength;
this.dataDelNum = dataDelNum;
this.parityDelNum = parityDelNum;
}
/**
* Corrupt tolerable number of block before reading.
* Verify the decoding works correctly.
*/
@Test
public void testReadCorruptedData() throws IOException {
String src = "/corrupted_" + dataDelNum + "_" + parityDelNum;
ReadStripedFileWithDecodingHelper.testReadWithBlockCorrupted(cluster,
dfs, src, fileLength, dataDelNum, parityDelNum, false);
}
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
/**
* Test online recovery with files with deleted blocks. This test is
* parameterized.
*/
@RunWith(Parameterized.class)
public class TestReadStripedFileWithDecodingDeletedData {
static final Logger LOG =
LoggerFactory.getLogger(TestReadStripedFileWithDecodingDeletedData.class);
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
@Rule
public Timeout globalTimeout = new Timeout(300000);
@BeforeClass
public static void setup() throws IOException {
cluster = initializeCluster();
dfs = cluster.getFileSystem();
}
@AfterClass
public static void tearDown() throws IOException {
tearDownCluster(cluster);
}
@Parameterized.Parameters
public static Collection<Object[]> getParameters() {
return ReadStripedFileWithDecodingHelper.getParameters();
}
private int fileLength;
private int dataDelNum;
private int parityDelNum;
public TestReadStripedFileWithDecodingDeletedData(int fileLength, int
dataDelNum, int parityDelNum) {
this.fileLength = fileLength;
this.dataDelNum = dataDelNum;
this.parityDelNum = parityDelNum;
}
/**
* Delete tolerable number of block before reading.
* Verify the decoding works correctly.
*/
@Test
public void testReadCorruptedDataByDeleting() throws IOException {
String src = "/deleted_" + dataDelNum + "_" + parityDelNum;
ReadStripedFileWithDecodingHelper.testReadWithBlockCorrupted(cluster,
dfs, src, fileLength, dataDelNum, parityDelNum, true);
}
}