HDFS-12497. Re-enable TestDFSStripedOutputStreamWithFailure tests. Contributed by Huafeng Wang.
This commit is contained in:
parent
6b7c87c945
commit
0477eff8be
|
@ -26,6 +26,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
||||||
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
|
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
|
||||||
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
|
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -77,30 +79,29 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
.getLogger().setLevel(Level.ALL);
|
.getLogger().setLevel(Level.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final int cellSize = 64 * 1024; //64k
|
||||||
|
private final int stripesPerBlock = 4;
|
||||||
private ErasureCodingPolicy ecPolicy;
|
private ErasureCodingPolicy ecPolicy;
|
||||||
private int dataBlocks;
|
private int dataBlocks;
|
||||||
private int parityBlocks;
|
private int parityBlocks;
|
||||||
private int cellSize;
|
|
||||||
private final int stripesPerBlock = 4;
|
|
||||||
private int blockSize;
|
private int blockSize;
|
||||||
private int blockGroupSize;
|
private int blockGroupSize;
|
||||||
|
|
||||||
private static final int FLUSH_POS =
|
private static final int FLUSH_POS =
|
||||||
9 * DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
|
9 * DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
|
||||||
|
|
||||||
public ErasureCodingPolicy getEcPolicy() {
|
public ECSchema getEcSchema() {
|
||||||
return StripedFileTestUtil.getDefaultECPolicy();
|
return StripedFileTestUtil.getDefaultECPolicy().getSchema();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Initialize erasure coding policy.
|
* Initialize erasure coding policy.
|
||||||
*/
|
*/
|
||||||
@Before
|
@Before
|
||||||
public void init(){
|
public void init() {
|
||||||
ecPolicy = getEcPolicy();
|
ecPolicy = new ErasureCodingPolicy(getEcSchema(), cellSize);
|
||||||
dataBlocks = ecPolicy.getNumDataUnits();
|
dataBlocks = ecPolicy.getNumDataUnits();
|
||||||
parityBlocks = ecPolicy.getNumParityUnits();
|
parityBlocks = ecPolicy.getNumParityUnits();
|
||||||
cellSize = ecPolicy.getCellSize();
|
|
||||||
blockSize = cellSize * stripesPerBlock;
|
blockSize = cellSize * stripesPerBlock;
|
||||||
blockGroupSize = blockSize * dataBlocks;
|
blockGroupSize = blockSize * dataBlocks;
|
||||||
dnIndexSuite = getDnIndexSuite();
|
dnIndexSuite = getDnIndexSuite();
|
||||||
|
@ -189,7 +190,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
private List<Integer> lengths;
|
private List<Integer> lengths;
|
||||||
|
|
||||||
Integer getLength(int i) {
|
Integer getLength(int i) {
|
||||||
return i >= 0 && i < lengths.size()? lengths.get(i): null;
|
return i >= 0 && i < lengths.size() ? lengths.get(i): null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Random RANDOM = new Random();
|
private static final Random RANDOM = new Random();
|
||||||
|
@ -220,6 +221,10 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
dfs = cluster.getFileSystem();
|
dfs = cluster.getFileSystem();
|
||||||
|
AddErasureCodingPolicyResponse[] res =
|
||||||
|
dfs.addErasureCodingPolicies(new ErasureCodingPolicy[]{ecPolicy});
|
||||||
|
ecPolicy = res[0].getPolicy();
|
||||||
|
dfs.enableErasureCodingPolicy(ecPolicy.getName());
|
||||||
DFSTestUtil.enableAllECPolicies(dfs);
|
DFSTestUtil.enableAllECPolicies(dfs);
|
||||||
dfs.mkdirs(dir);
|
dfs.mkdirs(dir);
|
||||||
dfs.setErasureCodingPolicy(dir, ecPolicy.getName());
|
dfs.setErasureCodingPolicy(dir, ecPolicy.getName());
|
||||||
|
@ -241,7 +246,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=240000)
|
@Test(timeout=300000)
|
||||||
public void testMultipleDatanodeFailure56() throws Exception {
|
public void testMultipleDatanodeFailure56() throws Exception {
|
||||||
runTestWithMultipleFailure(getLength(56));
|
runTestWithMultipleFailure(getLength(56));
|
||||||
}
|
}
|
||||||
|
@ -260,7 +265,8 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
|
|
||||||
@Test(timeout=240000)
|
@Test(timeout=240000)
|
||||||
public void testBlockTokenExpired() throws Exception {
|
public void testBlockTokenExpired() throws Exception {
|
||||||
final int length = dataBlocks * (blockSize - cellSize);
|
// Make sure killPos is greater than the length of one stripe
|
||||||
|
final int length = dataBlocks * cellSize * 3;
|
||||||
final HdfsConfiguration conf = newHdfsConfiguration();
|
final HdfsConfiguration conf = newHdfsConfiguration();
|
||||||
|
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
|
||||||
|
@ -300,13 +306,13 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
DatanodeInfo[] info = dfs.getClient().datanodeReport(
|
DatanodeInfo[] info = dfs.getClient().datanodeReport(
|
||||||
DatanodeReportType.LIVE);
|
DatanodeReportType.LIVE);
|
||||||
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
|
assertEquals("Mismatches number of live Dns", numDatanodes, info.length);
|
||||||
final Path dirFile = new Path(dir, "ecfile");
|
final Path dirFile = new Path(dir, "ecfile");
|
||||||
LambdaTestUtils.intercept(
|
LambdaTestUtils.intercept(
|
||||||
IOException.class,
|
IOException.class,
|
||||||
"File " + dirFile + " could only be written to " +
|
"File " + dirFile + " could only be written to " +
|
||||||
numDatanodes + " of the " + dataBlocks + " required nodes for " +
|
numDatanodes + " of the " + dataBlocks + " required nodes for " +
|
||||||
getEcPolicy().getName(),
|
ecPolicy.getName(),
|
||||||
() -> {
|
() -> {
|
||||||
try (FSDataOutputStream out = dfs.create(dirFile, true)) {
|
try (FSDataOutputStream out = dfs.create(dirFile, true)) {
|
||||||
out.write("something".getBytes());
|
out.write("something".getBytes());
|
||||||
|
@ -413,7 +419,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
cluster.triggerHeartbeats();
|
cluster.triggerHeartbeats();
|
||||||
DatanodeInfo[] info = dfs.getClient().datanodeReport(
|
DatanodeInfo[] info = dfs.getClient().datanodeReport(
|
||||||
DatanodeReportType.LIVE);
|
DatanodeReportType.LIVE);
|
||||||
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
|
assertEquals("Mismatches number of live Dns", numDatanodes, info.length);
|
||||||
Path srcPath = new Path(dir, "testAddBlockWhenNoSufficientParityNodes");
|
Path srcPath = new Path(dir, "testAddBlockWhenNoSufficientParityNodes");
|
||||||
int fileLength = cellSize - 1000;
|
int fileLength = cellSize - 1000;
|
||||||
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
|
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
|
||||||
|
@ -432,7 +438,7 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
try {
|
try {
|
||||||
LOG.info("runTest: dn=" + dn + ", length=" + length);
|
LOG.info("runTest: dn=" + dn + ", length=" + length);
|
||||||
setup(conf);
|
setup(conf);
|
||||||
runTest(length, new int[]{length/2}, new int[]{dn}, false);
|
runTest(length, new int[]{length / 2}, new int[]{dn}, false);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
final String err = "failed, dn=" + dn + ", length=" + length
|
final String err = "failed, dn=" + dn + ", length=" + length
|
||||||
+ StringUtils.stringifyException(e);
|
+ StringUtils.stringifyException(e);
|
||||||
|
@ -582,10 +588,10 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
long oldGS = -1; // the old GS before bumping
|
long oldGS = -1; // the old GS before bumping
|
||||||
List<Long> gsList = new ArrayList<>();
|
List<Long> gsList = new ArrayList<>();
|
||||||
final List<DatanodeInfo> killedDN = new ArrayList<>();
|
final List<DatanodeInfo> killedDN = new ArrayList<>();
|
||||||
int numKilled=0;
|
int numKilled = 0;
|
||||||
for(; pos.get() < length;) {
|
for(; pos.get() < length;) {
|
||||||
final int i = pos.getAndIncrement();
|
final int i = pos.getAndIncrement();
|
||||||
if (numKilled < killPos.length && i == killPos[numKilled]) {
|
if (numKilled < killPos.length && i == killPos[numKilled]) {
|
||||||
assertTrue(firstGS != -1);
|
assertTrue(firstGS != -1);
|
||||||
final long gs = getGenerationStamp(stripedOut);
|
final long gs = getGenerationStamp(stripedOut);
|
||||||
if (numKilled == 0) {
|
if (numKilled == 0) {
|
||||||
|
@ -706,8 +712,6 @@ public class TestDFSStripedOutputStreamWithFailure {
|
||||||
|
|
||||||
private void run(int offset) {
|
private void run(int offset) {
|
||||||
int base = getBase();
|
int base = getBase();
|
||||||
// TODO: Fix and re-enable these flaky tests. See HDFS-12417.
|
|
||||||
assumeTrue("Test has been temporarily disabled. See HDFS-12417.", false);
|
|
||||||
assumeTrue(base >= 0);
|
assumeTrue(base >= 0);
|
||||||
final int i = offset + base;
|
final int i = offset + base;
|
||||||
final Integer length = getLength(i);
|
final Integer length = getLength(i);
|
||||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This tests write operation of DFS striped file with a random erasure code
|
* This tests write operation of DFS striped file with a random erasure code
|
||||||
|
@ -28,18 +28,18 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
public class TestDFSStripedOutputStreamWithFailureWithRandomECPolicy extends
|
public class TestDFSStripedOutputStreamWithFailureWithRandomECPolicy extends
|
||||||
TestDFSStripedOutputStreamWithFailure {
|
TestDFSStripedOutputStreamWithFailure {
|
||||||
|
|
||||||
|
private final ECSchema schema;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(
|
private static final Log LOG = LogFactory.getLog(
|
||||||
TestDFSStripedOutputStreamWithRandomECPolicy.class.getName());
|
TestDFSStripedOutputStreamWithRandomECPolicy.class.getName());
|
||||||
|
|
||||||
private ErasureCodingPolicy ecPolicy;
|
|
||||||
|
|
||||||
public TestDFSStripedOutputStreamWithFailureWithRandomECPolicy() {
|
public TestDFSStripedOutputStreamWithFailureWithRandomECPolicy() {
|
||||||
ecPolicy = StripedFileTestUtil.getRandomNonDefaultECPolicy();
|
schema = StripedFileTestUtil.getRandomNonDefaultECPolicy().getSchema();
|
||||||
LOG.info(ecPolicy);
|
LOG.info(schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ErasureCodingPolicy getEcPolicy() {
|
public ECSchema getEcSchema() {
|
||||||
return ecPolicy;
|
return schema;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue