From e6cb07520f935efde3e881de8f84ee7f6e0a746f Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Fri, 24 Jun 2016 17:39:32 +0800 Subject: [PATCH] HDFS-10460. Recompute block checksum for a particular range less than file size on the fly by reconstructing missed block. Contributed by Rakesh R --- .../hadoop/hdfs/FileChecksumHelper.java | 13 +- .../datatransfer/DataTransferProtocol.java | 5 +- .../hdfs/protocol/datatransfer/Sender.java | 4 +- .../src/main/proto/datatransfer.proto | 1 + .../hdfs/protocol/datatransfer/Receiver.java | 3 +- .../server/datanode/BlockChecksumHelper.java | 81 +++-- .../hdfs/server/datanode/DataXceiver.java | 5 +- .../StripedBlockChecksumReconstructor.java | 69 +++- .../apache/hadoop/hdfs/TestFileChecksum.java | 309 ++++++++++++++++-- 9 files changed, 415 insertions(+), 75 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java index c213fa35a61..fe462f27553 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java @@ -454,10 +454,11 @@ void checksumBlocks() throws IOException { private boolean checksumBlockGroup( LocatedStripedBlock blockGroup) throws IOException { ExtendedBlock block = blockGroup.getBlock(); + long requestedNumBytes = block.getNumBytes(); if (getRemaining() < block.getNumBytes()) { - block.setNumBytes(getRemaining()); + requestedNumBytes = getRemaining(); } - setRemaining(getRemaining() - block.getNumBytes()); + setRemaining(getRemaining() - requestedNumBytes); StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block, blockGroup.getLocations(), blockGroup.getBlockTokens(), @@ -468,7 +469,8 @@ private boolean checksumBlockGroup( boolean done = false; for (int j = 0; !done && j < datanodes.length; j++) { try { - tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]); + tryDatanode(blockGroup, stripedBlockInfo, datanodes[j], + requestedNumBytes); done = true; } catch (InvalidBlockTokenException ibte) { if (bgIdx > getLastRetriedIndex()) { @@ -496,7 +498,8 @@ private boolean checksumBlockGroup( */ private void tryDatanode(LocatedStripedBlock blockGroup, StripedBlockInfo stripedBlockInfo, - DatanodeInfo datanode) throws IOException { + DatanodeInfo datanode, + long requestedNumBytes) throws IOException { try (IOStreamPair pair = getClient().connectToDN(datanode, getTimeout(), blockGroup.getBlockToken())) { @@ -506,7 +509,7 @@ private void tryDatanode(LocatedStripedBlock blockGroup, // get block MD5 createSender(pair).blockGroupChecksum(stripedBlockInfo, - blockGroup.getBlockToken()); + blockGroup.getBlockToken(), requestedNumBytes); BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(pair.in)); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index ad3f2ad90a1..94f89066ef8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -207,8 +207,11 @@ void blockChecksum(ExtendedBlock blk, * * @param stripedBlockInfo a striped block info. * @param blockToken security token for accessing the block. + * @param requestedNumBytes requested number of bytes in the block group + * to compute the checksum. * @throws IOException */ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, - Token blockToken) throws IOException; + Token blockToken, + long requestedNumBytes) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index bc73bfc32c6..e133975f535 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -266,7 +266,8 @@ public void blockChecksum(final ExtendedBlock blk, @Override public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, - Token blockToken) throws IOException { + Token blockToken, long requestedNumBytes) + throws IOException { OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader( stripedBlockInfo.getBlock(), blockToken)) @@ -278,6 +279,7 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, .convertBlockIndices(stripedBlockInfo.getBlockIndices())) .setEcPolicy(PBHelperClient.convertErasureCodingPolicy( stripedBlockInfo.getErasureCodingPolicy())) + .setRequestedNumBytes(requestedNumBytes) .build(); send(out, Op.BLOCK_GROUP_CHECKSUM, proto); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 14073516950..290b158b702 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -155,6 +155,7 @@ message OpBlockGroupChecksumProto { repeated hadoop.common.TokenProto blockTokens = 3; required ErasureCodingPolicyProto ecPolicy = 4; repeated uint32 blockIndices = 5; + required uint64 requestedNumBytes = 6; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 8b863f7e7d8..08ab967b54e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -312,7 +312,8 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException { try { blockGroupChecksum(stripedBlockInfo, - PBHelperClient.convert(proto.getHeader().getToken())); + PBHelperClient.convert(proto.getHeader().getToken()), + proto.getRequestedNumBytes()); } finally { if (traceScope != null) { traceScope.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java index ec6bbb6f85f..f54978546ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java @@ -285,10 +285,8 @@ void compute() throws IOException { } setOutBytes(md5out.getDigest()); - if (LOG.isDebugEnabled()) { - LOG.debug("block=" + getBlock() + ", bytesPerCRC=" + getBytesPerCRC() - + ", crcPerBlock=" + getCrcPerBlock() + ", md5out=" + md5out); - } + LOG.debug("block={}, bytesPerCRC={}, crcPerBlock={}, md5out={}", + getBlock(), getBytesPerCRC(), getCrcPerBlock(), md5out); } finally { IOUtils.closeStream(getChecksumIn()); IOUtils.closeStream(getMetadataIn()); @@ -335,11 +333,13 @@ static class BlockGroupNonStripedChecksumComputer private final DatanodeInfo[] datanodes; private final Token[] blockTokens; private final byte[] blockIndices; + private final long requestedNumBytes; private final DataOutputBuffer md5writer = new DataOutputBuffer(); BlockGroupNonStripedChecksumComputer(DataNode datanode, - StripedBlockInfo stripedBlockInfo) + StripedBlockInfo stripedBlockInfo, + long requestedNumBytes) throws IOException { super(datanode); this.blockGroup = stripedBlockInfo.getBlock(); @@ -347,6 +347,7 @@ static class BlockGroupNonStripedChecksumComputer this.datanodes = stripedBlockInfo.getDatanodes(); this.blockTokens = stripedBlockInfo.getBlockTokens(); this.blockIndices = stripedBlockInfo.getBlockIndices(); + this.requestedNumBytes = requestedNumBytes; } private static class LiveBlockInfo { @@ -380,24 +381,29 @@ void compute() throws IOException { liveDns.put(blockIndices[idx], new LiveBlockInfo(datanodes[idx], blockTokens[idx])); } + long checksumLen = 0; for (int idx = 0; idx < numDataUnits && idx < blkIndxLen; idx++) { try { + ExtendedBlock block = getInternalBlock(numDataUnits, idx); + LiveBlockInfo liveBlkInfo = liveDns.get((byte) idx); if (liveBlkInfo == null) { // reconstruct block and calculate checksum for missing node - recalculateChecksum(idx); + recalculateChecksum(idx, block.getNumBytes()); } else { try { - ExtendedBlock block = StripedBlockUtil.constructInternalBlock( - blockGroup, ecPolicy.getCellSize(), numDataUnits, idx); checksumBlock(block, idx, liveBlkInfo.getToken(), liveBlkInfo.getDn()); } catch (IOException ioe) { LOG.warn("Exception while reading checksum", ioe); // reconstruct block and calculate checksum for the failed node - recalculateChecksum(idx); + recalculateChecksum(idx, block.getNumBytes()); } } + checksumLen += block.getNumBytes(); + if (checksumLen >= requestedNumBytes) { + break; // done with the computation, simply return. + } } catch (IOException e) { LOG.warn("Failed to get the checksum", e); } @@ -407,6 +413,20 @@ void compute() throws IOException { setOutBytes(md5out.getDigest()); } + private ExtendedBlock getInternalBlock(int numDataUnits, int idx) { + // Sets requested number of bytes in blockGroup which is required to + // construct the internal block for computing checksum. + long actualNumBytes = blockGroup.getNumBytes(); + blockGroup.setNumBytes(requestedNumBytes); + + ExtendedBlock block = StripedBlockUtil.constructInternalBlock(blockGroup, + ecPolicy.getCellSize(), numDataUnits, idx); + + // Set back actualNumBytes value in blockGroup. + blockGroup.setNumBytes(actualNumBytes); + return block; + } + private void checksumBlock(ExtendedBlock block, int blockIdx, Token blockToken, DatanodeInfo targetDatanode) throws IOException { @@ -446,9 +466,7 @@ private void checksumBlock(ExtendedBlock block, int blockIdx, //read md5 final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray()); md5.write(md5writer); - if (LOG.isDebugEnabled()) { - LOG.debug("got reply from " + targetDatanode + ": md5=" + md5); - } + LOG.debug("got reply from datanode:{}, md5={}", targetDatanode, md5); } } @@ -456,34 +474,35 @@ private void checksumBlock(ExtendedBlock block, int blockIdx, * Reconstruct this data block and recalculate checksum. * * @param errBlkIndex - * error index to be reconstrcuted and recalculate checksum. + * error index to be reconstructed and recalculate checksum. + * @param blockLength + * number of bytes in the block to compute checksum. * @throws IOException */ - private void recalculateChecksum(int errBlkIndex) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Recalculate checksum for the missing/failed block index " - + errBlkIndex); - } + private void recalculateChecksum(int errBlkIndex, long blockLength) + throws IOException { + LOG.debug("Recalculate checksum for the missing/failed block index {}", + errBlkIndex); byte[] errIndices = new byte[1]; errIndices[0] = (byte) errBlkIndex; + StripedReconstructionInfo stripedReconInfo = new StripedReconstructionInfo( - blockGroup, ecPolicy, blockIndices, datanodes, errIndices); + blockGroup, ecPolicy, blockIndices, datanodes, errIndices); final StripedBlockChecksumReconstructor checksumRecon = new StripedBlockChecksumReconstructor( - getDatanode().getErasureCodingWorker(), stripedReconInfo, - md5writer); + getDatanode().getErasureCodingWorker(), stripedReconInfo, + md5writer, blockLength); checksumRecon.reconstruct(); DataChecksum checksum = checksumRecon.getChecksum(); long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0 : checksumRecon.getChecksumDataLen() / checksum.getChecksumSize(); - setOrVerifyChecksumProperties(errBlkIndex, checksum.getBytesPerChecksum(), - crcPerBlock, checksum.getChecksumType()); - if (LOG.isDebugEnabled()) { - LOG.debug("Recalculated checksum for the block index " + errBlkIndex - + ": md5=" + checksumRecon.getMD5()); - } + setOrVerifyChecksumProperties(errBlkIndex, + checksum.getBytesPerChecksum(), crcPerBlock, + checksum.getChecksumType()); + LOG.debug("Recalculated checksum for the block index:{}, md5={}", + errBlkIndex, checksumRecon.getMD5()); } private void setOrVerifyChecksumProperties(int blockIdx, int bpc, @@ -509,11 +528,9 @@ private void setOrVerifyChecksumProperties(int blockIdx, int bpc, setCrcType(DataChecksum.Type.MIXED); } - if (LOG.isDebugEnabled()) { - if (blockIdx == 0) { - LOG.debug("set bytesPerCRC=" + getBytesPerCRC() - + ", crcPerBlock=" + getCrcPerBlock()); - } + if (blockIdx == 0) { + LOG.debug("set bytesPerCRC={}, crcPerBlock={}", getBytesPerCRC(), + getCrcPerBlock()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 829badd742f..9236a192185 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -964,7 +964,7 @@ public void blockChecksum(ExtendedBlock block, @Override public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, - final Token blockToken) + final Token blockToken, long requestedNumBytes) throws IOException { updateCurrentThreadName("Getting checksum for block group" + stripedBlockInfo.getBlock()); @@ -973,7 +973,8 @@ public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ); AbstractBlockChecksumComputer maker = - new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo); + new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo, + requestedNumBytes); try { maker.compute(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java index 1b6758bff8a..c7294c726fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.MessageDigest; +import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.DataOutputBuffer; @@ -41,14 +42,17 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor { private DataOutputBuffer checksumWriter; private MD5Hash md5; private long checksumDataLen; + private long requestedLen; public StripedBlockChecksumReconstructor(ErasureCodingWorker worker, StripedReconstructionInfo stripedReconInfo, - DataOutputBuffer checksumWriter) throws IOException { + DataOutputBuffer checksumWriter, + long requestedBlockLength) throws IOException { super(worker, stripedReconInfo); this.targetIndices = stripedReconInfo.getTargetIndices(); assert targetIndices != null; this.checksumWriter = checksumWriter; + this.requestedLen = requestedBlockLength; init(); } @@ -69,8 +73,9 @@ private void init() throws IOException { public void reconstruct() throws IOException { MessageDigest digester = MD5Hash.getDigester(); - while (getPositionInBlock() < getMaxTargetLength()) { - long remaining = getMaxTargetLength() - getPositionInBlock(); + long maxTargetLength = getMaxTargetLength(); + while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) { + long remaining = maxTargetLength - getPositionInBlock(); final int toReconstructLen = (int) Math .min(getStripedReader().getBufferSize(), remaining); // step1: read from minimum source DNs required for reconstruction. @@ -81,13 +86,11 @@ public void reconstruct() throws IOException { reconstructTargets(toReconstructLen); // step3: calculate checksum - getChecksum().calculateChunkedSums(targetBuffer.array(), 0, - targetBuffer.remaining(), checksumBuf, 0); + checksumDataLen += checksumWithTargetOutput(targetBuffer.array(), + toReconstructLen, digester); - // step4: updates the digest using the checksum array of bytes - digester.update(checksumBuf, 0, checksumBuf.length); - checksumDataLen += checksumBuf.length; updatePositionInBlock(toReconstructLen); + requestedLen -= toReconstructLen; clearBuffers(); } @@ -96,6 +99,56 @@ public void reconstruct() throws IOException { md5.write(checksumWriter); } + private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen, + MessageDigest digester) throws IOException { + long checksumDataLength = 0; + // Calculate partial block checksum. There are two cases. + // case-1) length of data bytes which is fraction of bytesPerCRC + // case-2) length of data bytes which is less than bytesPerCRC + if (requestedLen <= toReconstructLen) { + int remainingLen = (int) requestedLen; + outputData = Arrays.copyOf(targetBuffer.array(), remainingLen); + + int partialLength = remainingLen % getChecksum().getBytesPerChecksum(); + + int checksumRemaining = (remainingLen + / getChecksum().getBytesPerChecksum()) + * getChecksum().getChecksumSize(); + + int dataOffset = 0; + + // case-1) length of data bytes which is fraction of bytesPerCRC + if (checksumRemaining > 0) { + remainingLen = remainingLen - partialLength; + checksumBuf = new byte[checksumRemaining]; + getChecksum().calculateChunkedSums(outputData, dataOffset, + remainingLen, checksumBuf, 0); + digester.update(checksumBuf, 0, checksumBuf.length); + checksumDataLength = checksumBuf.length; + dataOffset = remainingLen; + } + + // case-2) length of data bytes which is less than bytesPerCRC + if (partialLength > 0) { + byte[] partialCrc = new byte[getChecksum().getChecksumSize()]; + getChecksum().update(outputData, dataOffset, partialLength); + getChecksum().writeValue(partialCrc, 0, true); + digester.update(partialCrc); + checksumDataLength += partialCrc.length; + } + + clearBuffers(); + // calculated checksum for the requested length, return checksum length. + return checksumDataLength; + } + getChecksum().calculateChunkedSums(outputData, 0, + outputData.length, checksumBuf, 0); + + // updates digest using the checksum array of bytes + digester.update(checksumBuf, 0, checksumBuf.length); + return checksumBuf.length; + } + private void reconstructTargets(int toReconstructLen) { initDecoderIfNecessary(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java index 3bee6be12ed..908ab0c44c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileChecksum.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.Path; @@ -31,6 +29,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import java.io.IOException; @@ -41,7 +42,8 @@ * are the same. */ public class TestFileChecksum { - public static final Log LOG = LogFactory.getLog(TestFileChecksum.class); + private static final Logger LOG = LoggerFactory + .getLogger(TestFileChecksum.class); private int dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS; private int parityBlocks = StripedFileTestUtil.NUM_PARITY_BLOCKS; @@ -58,6 +60,7 @@ public class TestFileChecksum { private int stripSize = cellSize * dataBlocks; private int blockGroupSize = stripesPerBlock * stripSize; private int fileSize = numBlockGroups * blockGroupSize; + private int bytesPerCRC; private String ecDir = "/striped"; private String stripedFile1 = ecDir + "/stripedFileChecksum1"; @@ -79,10 +82,9 @@ public void setup() throws IOException { fs = cluster.getFileSystem(); client = fs.getClient(); - prepareTestFiles(); - - getDataNodeToKill(stripedFile1); - getDataNodeToKill(replicatedFile); + bytesPerCRC = conf.getInt( + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT); } @After @@ -93,49 +95,57 @@ public void tearDown() { } } - @Test + @Test(timeout = 90000) public void testStripedFileChecksum1() throws Exception { int length = 0; + prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2}); testStripedFileChecksum(length, length + 10); } - @Test + @Test(timeout = 90000) public void testStripedFileChecksum2() throws Exception { int length = stripSize - 1; + prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2}); testStripedFileChecksum(length, length - 10); } - @Test + @Test(timeout = 90000) public void testStripedFileChecksum3() throws Exception { int length = stripSize; + prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2}); testStripedFileChecksum(length, length - 10); } - @Test + @Test(timeout = 90000) public void testStripedFileChecksum4() throws Exception { int length = stripSize + cellSize * 2; + prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2}); testStripedFileChecksum(length, length - 10); } - @Test + @Test(timeout = 90000) public void testStripedFileChecksum5() throws Exception { int length = blockGroupSize; + prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2}); testStripedFileChecksum(length, length - 10); } - @Test + @Test(timeout = 90000) public void testStripedFileChecksum6() throws Exception { int length = blockGroupSize + blockSize; + prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2}); testStripedFileChecksum(length, length - 10); } - @Test + @Test(timeout = 90000) public void testStripedFileChecksum7() throws Exception { int length = -1; // whole file + prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2}); testStripedFileChecksum(length, fileSize); } - void testStripedFileChecksum(int range1, int range2) throws Exception { + private void testStripedFileChecksum(int range1, int range2) + throws Exception { FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, range1, false); FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2, @@ -153,8 +163,9 @@ void testStripedFileChecksum(int range1, int range2) throws Exception { } } - @Test + @Test(timeout = 90000) public void testStripedAndReplicatedFileChecksum() throws Exception { + prepareTestFiles(fileSize, new String[] {stripedFile1, replicatedFile}); FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, 10, false); FileChecksum replicatedFileChecksum = getFileChecksum(replicatedFile, @@ -163,8 +174,9 @@ public void testStripedAndReplicatedFileChecksum() throws Exception { Assert.assertFalse(stripedFileChecksum1.equals(replicatedFileChecksum)); } - @Test + @Test(timeout = 90000) public void testStripedFileChecksumWithMissedDataBlocks1() throws Exception { + prepareTestFiles(fileSize, new String[] {stripedFile1}); FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, fileSize, false); FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile1, @@ -177,8 +189,9 @@ public void testStripedFileChecksumWithMissedDataBlocks1() throws Exception { stripedFileChecksum1.equals(stripedFileChecksumRecon)); } - @Test + @Test(timeout = 90000) public void testStripedFileChecksumWithMissedDataBlocks2() throws Exception { + prepareTestFiles(fileSize, new String[] {stripedFile1, stripedFile2}); FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile1, -1, false); FileChecksum stripedFileChecksum2 = getFileChecksum(stripedFile2, -1, @@ -198,6 +211,255 @@ public void testStripedFileChecksumWithMissedDataBlocks2() throws Exception { stripedFileChecksum2.equals(stripedFileChecksum2Recon)); } + private void testStripedFileChecksumWithMissedDataBlocksRangeQuery( + String stripedFile, int requestedLen) throws Exception { + LOG.info("Checksum file:{}, requested length:{}", stripedFile, + requestedLen); + prepareTestFiles(fileSize, new String[] {stripedFile}); + FileChecksum stripedFileChecksum1 = getFileChecksum(stripedFile, + requestedLen, false); + FileChecksum stripedFileChecksumRecon = getFileChecksum(stripedFile, + requestedLen, true); + + LOG.info("stripedFileChecksum1:" + stripedFileChecksum1); + LOG.info("stripedFileChecksumRecon:" + stripedFileChecksumRecon); + + Assert.assertTrue("Checksum mismatches!", + stripedFileChecksum1.equals(stripedFileChecksumRecon)); + } + + /** + * Test to verify that the checksum can be computed for a small file less than + * bytesPerCRC size. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery1() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, 1); + } + + /** + * Test to verify that the checksum can be computed for a small file less than + * bytesPerCRC size. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery2() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, 10); + } + + /** + * Test to verify that the checksum can be computed by giving bytesPerCRC + * length of file range for checksum calculation. 512 is the value of + * bytesPerCRC. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery3() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + bytesPerCRC); + } + + /** + * Test to verify that the checksum can be computed by giving 'cellsize' + * length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery4() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + cellSize); + } + + /** + * Test to verify that the checksum can be computed by giving less than + * cellsize length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery5() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + cellSize - 1); + } + + /** + * Test to verify that the checksum can be computed by giving greater than + * cellsize length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery6() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + cellSize + 1); + } + + /** + * Test to verify that the checksum can be computed by giving two times + * cellsize length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery7() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + cellSize * 2); + } + + /** + * Test to verify that the checksum can be computed by giving stripSize + * length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery8() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + stripSize); + } + + /** + * Test to verify that the checksum can be computed by giving less than + * stripSize length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery9() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + stripSize - 1); + } + + /** + * Test to verify that the checksum can be computed by giving greater than + * stripSize length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery10() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + stripSize + 1); + } + + /** + * Test to verify that the checksum can be computed by giving less than + * blockGroupSize length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery11() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + blockGroupSize - 1); + } + + /** + * Test to verify that the checksum can be computed by giving greaterthan + * blockGroupSize length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery12() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + blockGroupSize + 1); + } + + /** + * Test to verify that the checksum can be computed by giving greater than + * blockGroupSize length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery13() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + blockGroupSize * numBlockGroups / 2); + } + + /** + * Test to verify that the checksum can be computed by giving lessthan + * fileSize length of file range for checksum calculation. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery14() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + fileSize - 1); + } + + /** + * Test to verify that the checksum can be computed for a length greater than + * file size. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery15() + throws Exception { + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile1, + fileSize * 2); + } + + /** + * Test to verify that the checksum can be computed for a small file less than + * bytesPerCRC size. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery16() + throws Exception { + int fileLength = 100; + String stripedFile3 = ecDir + "/stripedFileChecksum3"; + prepareTestFiles(fileLength, new String[] {stripedFile3}); + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, + fileLength - 1); + } + + /** + * Test to verify that the checksum can be computed for a small file less than + * bytesPerCRC size. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery17() + throws Exception { + int fileLength = 100; + String stripedFile3 = ecDir + "/stripedFileChecksum3"; + prepareTestFiles(fileLength, new String[] {stripedFile3}); + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, 1); + } + + /** + * Test to verify that the checksum can be computed for a small file less than + * bytesPerCRC size. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery18() + throws Exception { + int fileLength = 100; + String stripedFile3 = ecDir + "/stripedFileChecksum3"; + prepareTestFiles(fileLength, new String[] {stripedFile3}); + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, 10); + } + + /** + * Test to verify that the checksum can be computed with greater than file + * length. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery19() + throws Exception { + int fileLength = 100; + String stripedFile3 = ecDir + "/stripedFileChecksum3"; + prepareTestFiles(fileLength, new String[] {stripedFile3}); + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, + fileLength * 2); + } + + /** + * Test to verify that the checksum can be computed for small file with less + * than file length. + */ + @Test(timeout = 90000) + public void testStripedFileChecksumWithMissedDataBlocksRangeQuery20() + throws Exception { + int fileLength = bytesPerCRC; + String stripedFile3 = ecDir + "/stripedFileChecksum3"; + prepareTestFiles(fileLength, new String[] {stripedFile3}); + testStripedFileChecksumWithMissedDataBlocksRangeQuery(stripedFile3, + bytesPerCRC - 1); + } + private FileChecksum getFileChecksum(String filePath, int range, boolean killDn) throws Exception { int dnIdxToDie = -1; @@ -223,12 +485,9 @@ private FileChecksum getFileChecksum(String filePath, int range, return fc; } - void prepareTestFiles() throws IOException { - byte[] fileData = StripedFileTestUtil.generateBytes(fileSize); - - String[] filePaths = new String[] { - stripedFile1, stripedFile2, replicatedFile - }; + private void prepareTestFiles(int fileLength, String[] filePaths) + throws IOException { + byte[] fileData = StripedFileTestUtil.generateBytes(fileLength); for (String filePath : filePaths) { Path testPath = new Path(filePath); @@ -267,4 +526,4 @@ int getDataNodeToKill(String filePath) throws IOException { return -1; } -} +} \ No newline at end of file