HDFS-12437. Fix test setup in TestLeaseRecoveryStriped.

This commit is contained in:
Andrew Wang 2017-09-19 16:42:20 -07:00
parent 51edaacd09
commit 12d9d7bc50
1 changed files with 119 additions and 49 deletions

View File

@ -19,8 +19,7 @@ package org.apache.hadoop.hdfs;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import org.apache.commons.logging.Log; import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -28,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -40,34 +40,41 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox; import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
public class TestLeaseRecoveryStriped { public class TestLeaseRecoveryStriped {
public static final Log LOG = LogFactory public static final Logger LOG = LoggerFactory
.getLog(TestLeaseRecoveryStriped.class); .getLogger(TestLeaseRecoveryStriped.class);
private final ErasureCodingPolicy ecPolicy = private final ErasureCodingPolicy ecPolicy =
StripedFileTestUtil.getDefaultECPolicy(); StripedFileTestUtil.getDefaultECPolicy();
private final int dataBlocks = ecPolicy.getNumDataUnits(); private final int dataBlocks = ecPolicy.getNumDataUnits();
private final int parityBlocks = ecPolicy.getNumParityUnits(); private final int parityBlocks = ecPolicy.getNumParityUnits();
private final int cellSize = ecPolicy.getCellSize(); private final int cellSize = ecPolicy.getCellSize();
private final int stripSize = dataBlocks * cellSize; private final int stripeSize = dataBlocks * cellSize;
private final int stripesPerBlock = 15; private final int stripesPerBlock = 4;
private final int blockSize = cellSize * stripesPerBlock; private final int blockSize = cellSize * stripesPerBlock;
private final int blockGroupSize = blockSize * dataBlocks; private final int blockGroupSize = blockSize * dataBlocks;
private static final int bytesPerChecksum = 512; private static final int bytesPerChecksum = 512;
static { static {
GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
GenericTestUtils.setLogLevel(DFSStripedOutputStream.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(BlockRecoveryWorker.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.DEBUG);
} }
static private final String fakeUsername = "fakeUser1"; static private final String fakeUsername = "fakeUser1";
@ -83,7 +90,7 @@ public class TestLeaseRecoveryStriped {
public void setup() throws IOException { public void setup() throws IOException {
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L); conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 60000L);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
false); false);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
@ -104,78 +111,118 @@ public class TestLeaseRecoveryStriped {
} }
} }
private int[][][] getBlockLengthsSuite() { private static class BlockLengths {
final int groups = 4; private final int[] blockLengths;
final int minNumCell = 3; private final long safeLength;
final int maxNumCell = 11;
final int minNumDelta = -4; BlockLengths(ErasureCodingPolicy policy, int[] blockLengths) {
final int maxNumDelta = 2; this.blockLengths = blockLengths;
int delta = 0; long[] longArray = Arrays.stream(blockLengths).asLongStream().toArray();
int[][][] blkLenSuite = new int[groups][][]; this.safeLength = StripedBlockUtil.getSafeLength(policy, longArray);
Random random = ThreadLocalRandom.current(); }
for (int i = 0; i < blkLenSuite.length; i++) {
if (i == blkLenSuite.length - 1) { @Override
delta = bytesPerChecksum; public String toString() {
} return new ToStringBuilder(this)
int[][] suite = new int[2][]; .append("blockLengths", getBlockLengths())
int[] lens = new int[dataBlocks + parityBlocks]; .append("safeLength", getSafeLength())
long[] lenInLong = new long[lens.length]; .toString();
for (int j = 0; j < lens.length; j++) { }
int numCell = random.nextInt(maxNumCell - minNumCell + 1) + minNumCell;
int numDelta = j < dataBlocks ? /**
random.nextInt(maxNumDelta - minNumDelta + 1) + minNumDelta : 0; * Length of each block in a block group.
lens[j] = cellSize * numCell + delta * numDelta; */
lenInLong[j] = lens[j]; public int[] getBlockLengths() {
} return blockLengths;
suite[0] = lens; }
suite[1] = new int[]{
(int) StripedBlockUtil.getSafeLength(ecPolicy, lenInLong)}; /**
blkLenSuite[i] = suite; * Safe length, calculated by the block lengths.
*/
public long getSafeLength() {
return safeLength;
} }
return blkLenSuite;
} }
private final int[][][] blockLengthsSuite = getBlockLengthsSuite(); private BlockLengths[] getBlockLengthsSuite() {
final int groups = 4;
final int minNumCell = 1;
final int maxNumCell = stripesPerBlock;
final int minNumDelta = -4;
final int maxNumDelta = 2;
BlockLengths[] suite = new BlockLengths[groups];
Random random = ThreadLocalRandom.current();
for (int i = 0; i < groups; i++) {
int[] blockLengths = new int[dataBlocks + parityBlocks];
for (int j = 0; j < blockLengths.length; j++) {
// Choose a random number of cells for the block
int numCell = random.nextInt(maxNumCell - minNumCell + 1) + minNumCell;
// For data blocks, jitter the length a bit
int numDelta = 0;
if (i == groups - 1 && j < dataBlocks) {
numDelta = random.nextInt(maxNumDelta - minNumDelta + 1) +
minNumDelta;
}
blockLengths[j] = (cellSize * numCell) + (bytesPerChecksum * numDelta);
}
suite[i] = new BlockLengths(ecPolicy, blockLengths);
}
return suite;
}
private final BlockLengths[] blockLengthsSuite = getBlockLengthsSuite();
@Test @Test
public void testLeaseRecovery() throws Exception { public void testLeaseRecovery() throws Exception {
LOG.info("blockLengthsSuite: " +
Arrays.toString(blockLengthsSuite));
for (int i = 0; i < blockLengthsSuite.length; i++) { for (int i = 0; i < blockLengthsSuite.length; i++) {
int[] blockLengths = blockLengthsSuite[i][0]; BlockLengths blockLengths = blockLengthsSuite[i];
int safeLength = blockLengthsSuite[i][1][0];
try { try {
runTest(blockLengths, safeLength); runTest(blockLengths.getBlockLengths(), blockLengths.getSafeLength());
} catch (Throwable e) { } catch (Throwable e) {
String msg = "failed testCase at i=" + i + ", blockLengths=" String msg = "failed testCase at i=" + i + ", blockLengths="
+ Arrays.toString(blockLengths) + "\n" + blockLengths + "\n"
+ StringUtils.stringifyException(e); + StringUtils.stringifyException(e);
Assert.fail(msg); Assert.fail(msg);
} }
} }
} }
private void runTest(int[] blockLengths, int safeLength) throws Exception { private void runTest(int[] blockLengths, long safeLength) throws Exception {
writePartialBlocks(blockLengths); writePartialBlocks(blockLengths);
recoverLease(); recoverLease();
List<Long> oldGS = new ArrayList<>(); List<Long> oldGS = new ArrayList<>();
oldGS.add(1001L); oldGS.add(1001L);
StripedFileTestUtil.checkData(dfs, p, safeLength, StripedFileTestUtil.checkData(dfs, p, (int)safeLength,
new ArrayList<DatanodeInfo>(), oldGS, blockGroupSize); new ArrayList<DatanodeInfo>(), oldGS, blockGroupSize);
// After recovery, storages are reported by primary DN. we should verify // After recovery, storages are reported by primary DN. we should verify
// storages reported by blockReport. // storages reported by blockReport.
cluster.restartNameNode(true); cluster.restartNameNode(true);
cluster.waitFirstBRCompleted(0, 10000); cluster.waitFirstBRCompleted(0, 10000);
StripedFileTestUtil.checkData(dfs, p, safeLength, StripedFileTestUtil.checkData(dfs, p, (int)safeLength,
new ArrayList<DatanodeInfo>(), oldGS, blockGroupSize); new ArrayList<DatanodeInfo>(), oldGS, blockGroupSize);
} }
/**
* Write a file with blocks of different lengths.
*
* This method depends on completing before the DFS socket timeout.
* Otherwise, the client will mark timed-out streamers as failed, and the
* write will fail if there are too many failed streamers.
*
* @param blockLengths lengths of blocks to write
* @throws Exception
*/
private void writePartialBlocks(int[] blockLengths) throws Exception { private void writePartialBlocks(int[] blockLengths) throws Exception {
final FSDataOutputStream out = dfs.create(p); final FSDataOutputStream out = dfs.create(p);
final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out
.getWrappedStream(); .getWrappedStream();
int length = (stripesPerBlock - 1) * stripSize; int length = (stripesPerBlock - 1) * stripeSize;
int[] posToKill = getPosToKill(blockLengths); int[] posToKill = getPosToKill(blockLengths);
int checkingPos = nextCheckingPos(posToKill, 0); int checkingPos = nextCheckingPos(posToKill, 0);
Set<Integer> stoppedStreamerIndexes = new HashSet<>();
try { try {
for (int pos = 0; pos < length; pos++) { for (int pos = 0; pos < length; pos++) {
out.write(StripedFileTestUtil.getByte(pos)); out.write(StripedFileTestUtil.getByte(pos));
@ -183,15 +230,31 @@ public class TestLeaseRecoveryStriped {
for (int index : getIndexToStop(posToKill, pos)) { for (int index : getIndexToStop(posToKill, pos)) {
out.flush(); out.flush();
stripedOut.enqueueAllCurrentPackets(); stripedOut.enqueueAllCurrentPackets();
LOG.info("Stopping block stream idx {} at file offset {} block " +
"length {}", index, pos, blockLengths[index]);
StripedDataStreamer s = stripedOut.getStripedDataStreamer(index); StripedDataStreamer s = stripedOut.getStripedDataStreamer(index);
waitStreamerAllAcked(s); waitStreamerAllAcked(s);
waitByteSent(s, blockLengths[index]); waitByteSent(s, blockLengths[index]);
stopBlockStream(s); stopBlockStream(s);
stoppedStreamerIndexes.add(index);
} }
checkingPos = nextCheckingPos(posToKill, pos); checkingPos = nextCheckingPos(posToKill, pos);
} }
} }
} finally { } finally {
// Flush everything
out.flush();
stripedOut.enqueueAllCurrentPackets();
// Wait for streamers that weren't killed above to be written out
for (int i=0; i< blockLengths.length; i++) {
if (stoppedStreamerIndexes.contains(i)) {
continue;
}
StripedDataStreamer s = stripedOut.getStripedDataStreamer(i);
LOG.info("Waiting for block stream idx {} to reach length {}", i,
blockLengths[i]);
waitStreamerAllAcked(s);
}
DFSTestUtil.abortStream(stripedOut); DFSTestUtil.abortStream(stripedOut);
} }
} }
@ -210,7 +273,7 @@ public class TestLeaseRecoveryStriped {
int[] posToKill = new int[dataBlocks + parityBlocks]; int[] posToKill = new int[dataBlocks + parityBlocks];
for (int i = 0; i < dataBlocks; i++) { for (int i = 0; i < dataBlocks; i++) {
int numStripe = (blockLengths[i] - 1) / cellSize; int numStripe = (blockLengths[i] - 1) / cellSize;
posToKill[i] = numStripe * stripSize + i * cellSize posToKill[i] = numStripe * stripeSize + i * cellSize
+ blockLengths[i] % cellSize; + blockLengths[i] % cellSize;
if (blockLengths[i] % cellSize == 0) { if (blockLengths[i] % cellSize == 0) {
posToKill[i] += cellSize; posToKill[i] += cellSize;
@ -220,7 +283,7 @@ public class TestLeaseRecoveryStriped {
+ parityBlocks; i++) { + parityBlocks; i++) {
Preconditions.checkArgument(blockLengths[i] % cellSize == 0); Preconditions.checkArgument(blockLengths[i] % cellSize == 0);
int numStripe = (blockLengths[i]) / cellSize; int numStripe = (blockLengths[i]) / cellSize;
posToKill[i] = numStripe * stripSize; posToKill[i] = numStripe * stripeSize;
} }
return posToKill; return posToKill;
} }
@ -243,13 +306,20 @@ public class TestLeaseRecoveryStriped {
public Boolean get() { public Boolean get() {
return s.bytesSent >= byteSent; return s.bytesSent >= byteSent;
} }
}, 100, 3000); }, 100, 30000);
} catch (TimeoutException e) { } catch (TimeoutException e) {
throw new IOException("Timeout waiting for streamer " + s + ". Sent=" throw new IOException("Timeout waiting for streamer " + s + ". Sent="
+ s.bytesSent + ", expected=" + byteSent); + s.bytesSent + ", expected=" + byteSent);
} }
} }
/**
* Stop the block stream without immediately inducing a hard failure.
* Packets can continue to be queued until the streamer hits a socket timeout.
*
* @param s
* @throws Exception
*/
private void stopBlockStream(StripedDataStreamer s) throws Exception { private void stopBlockStream(StripedDataStreamer s) throws Exception {
IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream(); IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
Whitebox.setInternalState(s, "blockStream", Whitebox.setInternalState(s, "blockStream",
@ -257,8 +327,8 @@ public class TestLeaseRecoveryStriped {
} }
private void recoverLease() throws Exception { private void recoverLease() throws Exception {
final DistributedFileSystem dfs2 = (DistributedFileSystem) getFSAsAnotherUser( final DistributedFileSystem dfs2 =
conf); (DistributedFileSystem) getFSAsAnotherUser(conf);
try { try {
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override