diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java index 2846dbf7f00..36ac8b3f96e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java @@ -19,8 +19,7 @@ package org.apache.hadoop.hdfs; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -28,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.IOUtils; @@ -40,34 +40,41 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; public class TestLeaseRecoveryStriped { - public static final Log LOG = LogFactory - .getLog(TestLeaseRecoveryStriped.class); + public static final Logger LOG = LoggerFactory + .getLogger(TestLeaseRecoveryStriped.class); private final ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); private final int dataBlocks = ecPolicy.getNumDataUnits(); private final int parityBlocks = ecPolicy.getNumParityUnits(); private final int cellSize = ecPolicy.getCellSize(); - private final int stripSize = dataBlocks * cellSize; - private final int stripesPerBlock = 15; + private final int stripeSize = dataBlocks * cellSize; + private final int stripesPerBlock = 4; private final int blockSize = cellSize * stripesPerBlock; private final int blockGroupSize = blockSize * dataBlocks; private static final int bytesPerChecksum = 512; static { GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL); + GenericTestUtils.setLogLevel(DFSStripedOutputStream.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(BlockRecoveryWorker.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.DEBUG); } static private final String fakeUsername = "fakeUser1"; @@ -83,7 +90,7 @@ public class TestLeaseRecoveryStriped { public void setup() throws IOException { conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); - conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L); + conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 60000L); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, false); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); @@ -104,78 +111,118 @@ public class TestLeaseRecoveryStriped { } } - private int[][][] getBlockLengthsSuite() { - final int groups = 4; - final int minNumCell = 3; - final int maxNumCell = 11; - final int minNumDelta = -4; - final int maxNumDelta = 2; - int delta = 0; - int[][][] blkLenSuite = new int[groups][][]; - Random random = ThreadLocalRandom.current(); - for (int i = 0; i < blkLenSuite.length; i++) { - if (i == blkLenSuite.length - 1) { - delta = bytesPerChecksum; - } - int[][] suite = new int[2][]; - int[] lens = new int[dataBlocks + parityBlocks]; - long[] lenInLong = new long[lens.length]; - for (int j = 0; j < lens.length; j++) { - int numCell = random.nextInt(maxNumCell - minNumCell + 1) + minNumCell; - int numDelta = j < dataBlocks ? - random.nextInt(maxNumDelta - minNumDelta + 1) + minNumDelta : 0; - lens[j] = cellSize * numCell + delta * numDelta; - lenInLong[j] = lens[j]; - } - suite[0] = lens; - suite[1] = new int[]{ - (int) StripedBlockUtil.getSafeLength(ecPolicy, lenInLong)}; - blkLenSuite[i] = suite; + private static class BlockLengths { + private final int[] blockLengths; + private final long safeLength; + + BlockLengths(ErasureCodingPolicy policy, int[] blockLengths) { + this.blockLengths = blockLengths; + long[] longArray = Arrays.stream(blockLengths).asLongStream().toArray(); + this.safeLength = StripedBlockUtil.getSafeLength(policy, longArray); + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("blockLengths", getBlockLengths()) + .append("safeLength", getSafeLength()) + .toString(); + } + + /** + * Length of each block in a block group. + */ + public int[] getBlockLengths() { + return blockLengths; + } + + /** + * Safe length, calculated by the block lengths. + */ + public long getSafeLength() { + return safeLength; } - return blkLenSuite; } - private final int[][][] blockLengthsSuite = getBlockLengthsSuite(); + private BlockLengths[] getBlockLengthsSuite() { + final int groups = 4; + final int minNumCell = 1; + final int maxNumCell = stripesPerBlock; + final int minNumDelta = -4; + final int maxNumDelta = 2; + BlockLengths[] suite = new BlockLengths[groups]; + Random random = ThreadLocalRandom.current(); + for (int i = 0; i < groups; i++) { + int[] blockLengths = new int[dataBlocks + parityBlocks]; + for (int j = 0; j < blockLengths.length; j++) { + // Choose a random number of cells for the block + int numCell = random.nextInt(maxNumCell - minNumCell + 1) + minNumCell; + // For data blocks, jitter the length a bit + int numDelta = 0; + if (i == groups - 1 && j < dataBlocks) { + numDelta = random.nextInt(maxNumDelta - minNumDelta + 1) + + minNumDelta; + } + blockLengths[j] = (cellSize * numCell) + (bytesPerChecksum * numDelta); + } + suite[i] = new BlockLengths(ecPolicy, blockLengths); + } + return suite; + } + + private final BlockLengths[] blockLengthsSuite = getBlockLengthsSuite(); @Test public void testLeaseRecovery() throws Exception { + LOG.info("blockLengthsSuite: " + + Arrays.toString(blockLengthsSuite)); for (int i = 0; i < blockLengthsSuite.length; i++) { - int[] blockLengths = blockLengthsSuite[i][0]; - int safeLength = blockLengthsSuite[i][1][0]; + BlockLengths blockLengths = blockLengthsSuite[i]; try { - runTest(blockLengths, safeLength); + runTest(blockLengths.getBlockLengths(), blockLengths.getSafeLength()); } catch (Throwable e) { String msg = "failed testCase at i=" + i + ", blockLengths=" - + Arrays.toString(blockLengths) + "\n" + + blockLengths + "\n" + StringUtils.stringifyException(e); Assert.fail(msg); } } } - private void runTest(int[] blockLengths, int safeLength) throws Exception { + private void runTest(int[] blockLengths, long safeLength) throws Exception { writePartialBlocks(blockLengths); recoverLease(); List oldGS = new ArrayList<>(); oldGS.add(1001L); - StripedFileTestUtil.checkData(dfs, p, safeLength, + StripedFileTestUtil.checkData(dfs, p, (int)safeLength, new ArrayList(), oldGS, blockGroupSize); // After recovery, storages are reported by primary DN. we should verify // storages reported by blockReport. cluster.restartNameNode(true); cluster.waitFirstBRCompleted(0, 10000); - StripedFileTestUtil.checkData(dfs, p, safeLength, + StripedFileTestUtil.checkData(dfs, p, (int)safeLength, new ArrayList(), oldGS, blockGroupSize); } + /** + * Write a file with blocks of different lengths. + * + * This method depends on completing before the DFS socket timeout. + * Otherwise, the client will mark timed-out streamers as failed, and the + * write will fail if there are too many failed streamers. + * + * @param blockLengths lengths of blocks to write + * @throws Exception + */ private void writePartialBlocks(int[] blockLengths) throws Exception { final FSDataOutputStream out = dfs.create(p); final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out .getWrappedStream(); - int length = (stripesPerBlock - 1) * stripSize; + int length = (stripesPerBlock - 1) * stripeSize; int[] posToKill = getPosToKill(blockLengths); int checkingPos = nextCheckingPos(posToKill, 0); + Set stoppedStreamerIndexes = new HashSet<>(); try { for (int pos = 0; pos < length; pos++) { out.write(StripedFileTestUtil.getByte(pos)); @@ -183,15 +230,31 @@ public class TestLeaseRecoveryStriped { for (int index : getIndexToStop(posToKill, pos)) { out.flush(); stripedOut.enqueueAllCurrentPackets(); + LOG.info("Stopping block stream idx {} at file offset {} block " + + "length {}", index, pos, blockLengths[index]); StripedDataStreamer s = stripedOut.getStripedDataStreamer(index); waitStreamerAllAcked(s); waitByteSent(s, blockLengths[index]); stopBlockStream(s); + stoppedStreamerIndexes.add(index); } checkingPos = nextCheckingPos(posToKill, pos); } } } finally { + // Flush everything + out.flush(); + stripedOut.enqueueAllCurrentPackets(); + // Wait for streamers that weren't killed above to be written out + for (int i=0; i< blockLengths.length; i++) { + if (stoppedStreamerIndexes.contains(i)) { + continue; + } + StripedDataStreamer s = stripedOut.getStripedDataStreamer(i); + LOG.info("Waiting for block stream idx {} to reach length {}", i, + blockLengths[i]); + waitStreamerAllAcked(s); + } DFSTestUtil.abortStream(stripedOut); } } @@ -210,7 +273,7 @@ public class TestLeaseRecoveryStriped { int[] posToKill = new int[dataBlocks + parityBlocks]; for (int i = 0; i < dataBlocks; i++) { int numStripe = (blockLengths[i] - 1) / cellSize; - posToKill[i] = numStripe * stripSize + i * cellSize + posToKill[i] = numStripe * stripeSize + i * cellSize + blockLengths[i] % cellSize; if (blockLengths[i] % cellSize == 0) { posToKill[i] += cellSize; @@ -220,7 +283,7 @@ public class TestLeaseRecoveryStriped { + parityBlocks; i++) { Preconditions.checkArgument(blockLengths[i] % cellSize == 0); int numStripe = (blockLengths[i]) / cellSize; - posToKill[i] = numStripe * stripSize; + posToKill[i] = numStripe * stripeSize; } return posToKill; } @@ -243,13 +306,20 @@ public class TestLeaseRecoveryStriped { public Boolean get() { return s.bytesSent >= byteSent; } - }, 100, 3000); + }, 100, 30000); } catch (TimeoutException e) { throw new IOException("Timeout waiting for streamer " + s + ". Sent=" + s.bytesSent + ", expected=" + byteSent); } } + /** + * Stop the block stream without immediately inducing a hard failure. + * Packets can continue to be queued until the streamer hits a socket timeout. + * + * @param s + * @throws Exception + */ private void stopBlockStream(StripedDataStreamer s) throws Exception { IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream(); Whitebox.setInternalState(s, "blockStream", @@ -257,8 +327,8 @@ public class TestLeaseRecoveryStriped { } private void recoverLease() throws Exception { - final DistributedFileSystem dfs2 = (DistributedFileSystem) getFSAsAnotherUser( - conf); + final DistributedFileSystem dfs2 = + (DistributedFileSystem) getFSAsAnotherUser(conf); try { GenericTestUtils.waitFor(new Supplier() { @Override