From e5ff0ea7ba087984262f1f27200ae5bb40d9b838 Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Sat, 26 Mar 2016 00:52:50 -0700 Subject: [PATCH] HDFS-9694. Make existing DFSClient#getFileChecksum() work for striped blocks. Contributed by Kai Zheng --- .../dev-support/findbugsExcludeFile.xml | 1 + .../org/apache/hadoop/hdfs/DFSClient.java | 11 +- .../hadoop/hdfs/FileChecksumHelper.java | 187 ++++++++++-- .../datatransfer/DataTransferProtocol.java | 16 +- .../hadoop/hdfs/protocol/datatransfer/Op.java | 1 + .../hdfs/protocol/datatransfer/Sender.java | 19 ++ .../hdfs/protocolPB/PBHelperClient.java | 42 ++- .../hadoop/hdfs/util/StripedBlockUtil.java | 12 + .../src/main/proto/datatransfer.proto | 9 +- .../hdfs/protocol/datatransfer/Receiver.java | 28 ++ .../server/datanode/BlockChecksumHelper.java | 284 ++++++++++++++---- .../hdfs/server/datanode/DataXceiver.java | 43 +++ 12 files changed, 570 insertions(+), 83 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml index 2c3329e47a9..9d6ab9a1029 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml @@ -8,6 +8,7 @@ + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 3506d3ac70a..88bd21909d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1704,7 +1704,10 @@ public DataEncryptionKey newDataEncryptionKey() throws IOException { /** * Get the checksum of the whole file or a range of the file. Note that the - * range always starts from the beginning of the file. + * range always starts from the beginning of the file. The file can be + * in replicated form, or striped mode. It can be used to checksum and compare + * two replicated files, or two striped files, but not applicable for two + * files of different block layout forms. * @param src The file path * @param length the length of the range, i.e., the range is [0, length] * @return The checksum @@ -1717,7 +1720,11 @@ public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length) LocatedBlocks blockLocations = getBlockLocations(src, length); - FileChecksumHelper.FileChecksumComputer maker = + FileChecksumHelper.FileChecksumComputer maker; + ErasureCodingPolicy ecPolicy = blockLocations.getErasureCodingPolicy(); + maker = ecPolicy != null ? + new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src, + length, blockLocations, namenode, this, ecPolicy) : new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length, blockLocations, namenode, this); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java index d15db9f11c9..dfd939397b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FileChecksumHelper.java @@ -22,10 +22,13 @@ import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; @@ -75,7 +78,7 @@ static abstract class FileChecksumComputer { private int bytesPerCRC = -1; private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT; private long crcPerBlock = 0; - private boolean refetchBlocks = false; + private boolean isRefetchBlocks = false; private int lastRetriedIndex = -1; /** @@ -127,8 +130,11 @@ LocatedBlocks getBlockLocations() { return blockLocations; } - void setBlockLocations(LocatedBlocks blockLocations) { - this.blockLocations = blockLocations; + void refetchBlocks() throws IOException { + this.blockLocations = getClient().getBlockLocations(getSrc(), + getLength()); + this.locatedBlocks = getBlockLocations().getLocatedBlocks(); + this.isRefetchBlocks = false; } int getTimeout() { @@ -143,10 +149,6 @@ List getLocatedBlocks() { return locatedBlocks; } - void setLocatedBlocks(List locatedBlocks) { - this.locatedBlocks = locatedBlocks; - } - long getRemaining() { return remaining; } @@ -180,11 +182,11 @@ void setCrcPerBlock(long crcPerBlock) { } boolean isRefetchBlocks() { - return refetchBlocks; + return isRefetchBlocks; } void setRefetchBlocks(boolean refetchBlocks) { - this.refetchBlocks = refetchBlocks; + this.isRefetchBlocks = refetchBlocks; } int getLastRetriedIndex() { @@ -278,10 +280,7 @@ void checksumBlocks() throws IOException { blockIdx < getLocatedBlocks().size() && getRemaining() >= 0; blockIdx++) { if (isRefetchBlocks()) { // refetch to get fresh tokens - setBlockLocations(getClient().getBlockLocations(getSrc(), - getLength())); - setLocatedBlocks(getBlockLocations().getLocatedBlocks()); - setRefetchBlocks(false); + refetchBlocks(); } LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx); @@ -380,15 +379,13 @@ private void tryDatanode(LocatedBlock locatedBlock, } //read md5 - final MD5Hash md5 = new MD5Hash( - checksumData.getMd5().toByteArray()); + final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray()); md5.write(getMd5out()); // read crc-type final DataChecksum.Type ct; if (checksumData.hasCrcType()) { - ct = PBHelperClient.convert(checksumData - .getCrcType()); + ct = PBHelperClient.convert(checksumData.getCrcType()); } else { LOG.debug("Retrieving checksum from an earlier-version DataNode: " + "inferring checksum by reading first byte"); @@ -413,4 +410,160 @@ && getCrcType() != ct) { } } } + + /** + * Striped file checksum computing. + */ + static class StripedFileNonStripedChecksumComputer + extends FileChecksumComputer { + private final ErasureCodingPolicy ecPolicy; + private int bgIdx; + + StripedFileNonStripedChecksumComputer(String src, long length, + LocatedBlocks blockLocations, + ClientProtocol namenode, + DFSClient client, + ErasureCodingPolicy ecPolicy) + throws IOException { + super(src, length, blockLocations, namenode, client); + + this.ecPolicy = ecPolicy; + } + + @Override + void checksumBlocks() throws IOException { + int tmpTimeout = 3000 * 1 + getClient().getConf().getSocketTimeout(); + setTimeout(tmpTimeout); + + for (bgIdx = 0; + bgIdx < getLocatedBlocks().size() && getRemaining() >= 0; bgIdx++) { + if (isRefetchBlocks()) { // refetch to get fresh tokens + refetchBlocks(); + } + + LocatedBlock locatedBlock = getLocatedBlocks().get(bgIdx); + LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock; + + if (!checksumBlockGroup(blockGroup)) { + throw new IOException("Fail to get block MD5 for " + locatedBlock); + } + } + } + + + private boolean checksumBlockGroup( + LocatedStripedBlock blockGroup) throws IOException { + ExtendedBlock block = blockGroup.getBlock(); + if (getRemaining() < block.getNumBytes()) { + block.setNumBytes(getRemaining()); + } + setRemaining(getRemaining() - block.getNumBytes()); + + StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block, + blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy); + DatanodeInfo[] datanodes = blockGroup.getLocations(); + + //try each datanode in the block group. + boolean done = false; + for (int j = 0; !done && j < datanodes.length; j++) { + try { + tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]); + done = true; + } catch (InvalidBlockTokenException ibte) { + if (bgIdx > getLastRetriedIndex()) { + LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " + + "for file {} for block {} from datanode {}. Will retry " + + "the block once.", + getSrc(), block, datanodes[j]); + setLastRetriedIndex(bgIdx); + done = true; // actually it's not done; but we'll retry + bgIdx--; // repeat at bgIdx-th block + setRefetchBlocks(true); + } + } catch (IOException ie) { + LOG.warn("src={}" + ", datanodes[{}]={}", + getSrc(), j, datanodes[j], ie); + } + } + + return done; + } + + /** + * Return true when sounds good to continue or retry, false when severe + * condition or totally failed. + */ + private void tryDatanode(LocatedStripedBlock blockGroup, + StripedBlockInfo stripedBlockInfo, + DatanodeInfo datanode) throws IOException { + + try (IOStreamPair pair = getClient().connectToDN(datanode, + getTimeout(), blockGroup.getBlockToken())) { + + LOG.debug("write to {}: {}, blockGroup={}", + datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup); + + // get block MD5 + createSender(pair).blockGroupChecksum(stripedBlockInfo, + blockGroup.getBlockToken()); + + BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(pair.in)); + + String logInfo = "for blockGroup " + blockGroup + + " from datanode " + datanode; + DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); + + OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse(); + + //read byte-per-checksum + final int bpc = checksumData.getBytesPerCrc(); + if (bgIdx == 0) { //first block + setBytesPerCRC(bpc); + } else { + if (bpc != getBytesPerCRC()) { + throw new IOException("Byte-per-checksum not matched: bpc=" + bpc + + " but bytesPerCRC=" + getBytesPerCRC()); + } + } + + //read crc-per-block + final long cpb = checksumData.getCrcPerBlock(); + if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block + setCrcPerBlock(cpb); + } + + //read md5 + final MD5Hash md5 = new MD5Hash( + checksumData.getMd5().toByteArray()); + md5.write(getMd5out()); + + // read crc-type + final DataChecksum.Type ct; + if (checksumData.hasCrcType()) { + ct = PBHelperClient.convert(checksumData.getCrcType()); + } else { + LOG.debug("Retrieving checksum from an earlier-version DataNode: " + + "inferring checksum by reading first byte"); + ct = getClient().inferChecksumTypeByReading(blockGroup, datanode); + } + + if (bgIdx == 0) { + setCrcType(ct); + } else if (getCrcType() != DataChecksum.Type.MIXED && + getCrcType() != ct) { + // if crc types are mixed in a file + setCrcType(DataChecksum.Type.MIXED); + } + + if (LOG.isDebugEnabled()) { + if (bgIdx == 0) { + LOG.debug("set bytesPerCRC=" + getBytesPerCRC() + + ", crcPerBlock=" + getCrcPerBlock()); + } + LOG.debug("got reply from " + datanode + ": md5=" + md5); + } + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 4aa545b3338..ad3f2ad90a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; @@ -197,6 +198,17 @@ void copyBlock(final ExtendedBlock blk, * @param blockToken security token for accessing the block. * @throws IOException */ - void blockChecksum(final ExtendedBlock blk, - final Token blockToken) throws IOException; + void blockChecksum(ExtendedBlock blk, + Token blockToken) throws IOException; + + + /** + * Get striped block group checksum (MD5 of CRC32). + * + * @param stripedBlockInfo a striped block info. + * @param blockToken security token for accessing the block. + * @throws IOException + */ + void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, + Token blockToken) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java index 511574c4145..94250e5e7f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java @@ -38,6 +38,7 @@ public enum Op { REQUEST_SHORT_CIRCUIT_FDS((byte)87), RELEASE_SHORT_CIRCUIT_FDS((byte)88), REQUEST_SHORT_CIRCUIT_SHM((byte)89), + BLOCK_GROUP_CHECKSUM((byte)90), CUSTOM((byte)127); /** The code for this operation. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 65456815aed..585ed99b1ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -28,11 +28,13 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; @@ -261,4 +263,21 @@ public void blockChecksum(final ExtendedBlock blk, send(out, Op.BLOCK_CHECKSUM, proto); } + + @Override + public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, + Token blockToken) throws IOException { + OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader( + stripedBlockInfo.getBlock(), blockToken)) + .setDatanodes(PBHelperClient.convertToProto( + stripedBlockInfo.getDatanodes())) + .addAllBlockTokens(PBHelperClient.convert( + stripedBlockInfo.getBlockTokens())) + .setEcPolicy(PBHelperClient.convertErasureCodingPolicy( + stripedBlockInfo.getErasureCodingPolicy())) + .build(); + + send(out, Op.BLOCK_GROUP_CHECKSUM, proto); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 38e875c012d..47593732123 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -553,10 +553,8 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) { proto.getCorrupt(), cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); List tokenProtos = proto.getBlockTokensList(); - Token[] blockTokens = new Token[indices.length]; - for (int i = 0; i < indices.length; i++) { - blockTokens[i] = convert(tokenProtos.get(i)); - } + Token[] blockTokens = + convertTokens(tokenProtos); ((LocatedStripedBlock) lb).setBlockTokens(blockTokens); } lb.setBlockToken(convert(proto.getBlockToken())); @@ -564,6 +562,18 @@ public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) { return lb; } + static public Token[] convertTokens( + List tokenProtos) { + + @SuppressWarnings("unchecked") + Token[] blockTokens = new Token[tokenProtos.size()]; + for (int i = 0; i < blockTokens.length; i++) { + blockTokens[i] = convert(tokenProtos.get(i)); + } + + return blockTokens; + } + static public DatanodeInfo convert(DatanodeInfoProto di) { if (di == null) return null; return new DatanodeInfo( @@ -815,9 +825,7 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) { byte[] indices = sb.getBlockIndices(); builder.setBlockIndices(PBHelperClient.getByteString(indices)); Token[] blockTokens = sb.getBlockTokens(); - for (int i = 0; i < indices.length; i++) { - builder.addBlockTokens(PBHelperClient.convert(blockTokens[i])); - } + builder.addAllBlockTokens(convert(blockTokens)); } return builder.setB(PBHelperClient.convert(b.getBlock())) @@ -825,6 +833,16 @@ public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) { .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); } + public static List convert( + Token[] blockTokens) { + List results = new ArrayList<>(blockTokens.length); + for (Token bt : blockTokens) { + results.add(convert(bt)); + } + + return results; + } + public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) { List cList = proto.getCreationPolicy() .getStorageTypesList(); @@ -2500,4 +2518,14 @@ public static ErasureCodingPolicyProto convertErasureCodingPolicy( .setId(policy.getId()); return builder.build(); } + + public static HdfsProtos.DatanodeInfosProto convertToProto( + DatanodeInfo[] datanodeInfos) { + HdfsProtos.DatanodeInfosProto.Builder builder = + HdfsProtos.DatanodeInfosProto.newBuilder(); + for (DatanodeInfo datanodeInfo : datanodeInfos) { + builder.addDatanodes(PBHelperClient.convert(datanodeInfo)); + } + return builder.build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 43772e2efbd..0819376aac0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -75,6 +75,18 @@ public class StripedBlockUtil { public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class); + /** + * Parses a striped block group into individual blocks. + * @param bg The striped block group + * @param ecPolicy The erasure coding policy + * @return An array of the blocks in the group + */ + public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg, + ErasureCodingPolicy ecPolicy) { + return parseStripedBlockGroup(bg, ecPolicy.getCellSize(), + ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits()); + } + /** * This method parses a striped block group into individual blocks. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index a091d417d7c..522ee06b68b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -74,7 +74,6 @@ message OpReadBlockProto { optional CachingStrategyProto cachingStrategy = 5; } - message ChecksumProto { required ChecksumTypeProto type = 1; required uint32 bytesPerChecksum = 2; @@ -149,6 +148,14 @@ message OpBlockChecksumProto { required BaseHeaderProto header = 1; } +message OpBlockGroupChecksumProto { + required BaseHeaderProto header = 1; + required DatanodeInfosProto datanodes = 2; + // each internal block has a block token + repeated hadoop.common.TokenProto blockTokens = 3; + required ErasureCodingPolicyProto ecPolicy = 4; +} + /** * An ID uniquely identifying a shared memory segment. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index e0401574ce6..b2f26f8d3cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -26,11 +26,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; @@ -111,6 +113,9 @@ protected final void processOp(Op op) throws IOException { case BLOCK_CHECKSUM: opBlockChecksum(in); break; + case BLOCK_GROUP_CHECKSUM: + opStripedBlockChecksum(in); + break; case TRANSFER_BLOCK: opTransferBlock(in); break; @@ -290,4 +295,27 @@ private void opBlockChecksum(DataInputStream in) throws IOException { if (traceScope != null) traceScope.close(); } } + + /** Receive OP_STRIPED_BLOCK_CHECKSUM. */ + private void opStripedBlockChecksum(DataInputStream dis) throws IOException { + OpBlockGroupChecksumProto proto = + OpBlockGroupChecksumProto.parseFrom(vintPrefixed(dis)); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + StripedBlockInfo stripedBlockInfo = new StripedBlockInfo( + PBHelperClient.convert(proto.getHeader().getBlock()), + PBHelperClient.convert(proto.getDatanodes()), + PBHelperClient.convertTokens(proto.getBlockTokensList()), + PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy()) + ); + + try { + blockGroupChecksum(stripedBlockInfo, + PBHelperClient.convert(proto.getHeader().getToken())); + } finally { + if (traceScope != null) { + traceScope.close(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java index 9a5552db3a7..1f1a25c4184 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockChecksumHelper.java @@ -19,16 +19,30 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Op; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos; +import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.security.MessageDigest; @@ -41,13 +55,87 @@ final class BlockChecksumHelper { static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class); - private BlockChecksumHelper() {} + private BlockChecksumHelper() { + } /** * The abstract base block checksum computer. */ - static abstract class BlockChecksumComputer { + static abstract class AbstractBlockChecksumComputer { private final DataNode datanode; + + private byte[] outBytes; + private int bytesPerCRC = -1; + private DataChecksum.Type crcType = null; + private long crcPerBlock = -1; + private int checksumSize = -1; + + AbstractBlockChecksumComputer(DataNode datanode) throws IOException { + this.datanode = datanode; + } + + abstract void compute() throws IOException; + + Sender createSender(IOStreamPair pair) { + DataOutputStream out = (DataOutputStream) pair.out; + return new Sender(out); + } + + DataNode getDatanode() { + return datanode; + } + + InputStream getBlockInputStream(ExtendedBlock block, long seekOffset) + throws IOException { + return datanode.data.getBlockInputStream(block, seekOffset); + } + + void setOutBytes(byte[] bytes) { + this.outBytes = bytes; + } + + byte[] getOutBytes() { + return outBytes; + } + + int getBytesPerCRC() { + return bytesPerCRC; + } + + public void setBytesPerCRC(int bytesPerCRC) { + this.bytesPerCRC = bytesPerCRC; + } + + public void setCrcType(DataChecksum.Type crcType) { + this.crcType = crcType; + } + + public void setCrcPerBlock(long crcPerBlock) { + this.crcPerBlock = crcPerBlock; + } + + public void setChecksumSize(int checksumSize) { + this.checksumSize = checksumSize; + } + + DataChecksum.Type getCrcType() { + return crcType; + } + + long getCrcPerBlock() { + return crcPerBlock; + } + + int getChecksumSize() { + return checksumSize; + } + } + + /** + * The abstract base block checksum computer. + */ + static abstract class BlockChecksumComputer + extends AbstractBlockChecksumComputer { private final ExtendedBlock block; // client side now can specify a range of the block for checksum private final long requestLength; @@ -56,17 +144,12 @@ static abstract class BlockChecksumComputer { private final long visibleLength; private final boolean partialBlk; - private byte[] outBytes; - private int bytesPerCRC = -1; - private DataChecksum.Type crcType = null; - private long crcPerBlock = -1; - private int checksumSize = -1; private BlockMetadataHeader header; private DataChecksum checksum; BlockChecksumComputer(DataNode datanode, ExtendedBlock block) throws IOException { - this.datanode = datanode; + super(datanode); this.block = block; this.requestLength = block.getNumBytes(); Preconditions.checkArgument(requestLength >= 0); @@ -81,98 +164,80 @@ static abstract class BlockChecksumComputer { new BufferedInputStream(metadataIn, ioFileBufferSize)); } - protected DataNode getDatanode() { - return datanode; + Sender createSender(IOStreamPair pair) { + DataOutputStream out = (DataOutputStream) pair.out; + return new Sender(out); } - protected ExtendedBlock getBlock() { + + ExtendedBlock getBlock() { return block; } - protected long getRequestLength() { + long getRequestLength() { return requestLength; } - protected LengthInputStream getMetadataIn() { + LengthInputStream getMetadataIn() { return metadataIn; } - protected DataInputStream getChecksumIn() { + DataInputStream getChecksumIn() { return checksumIn; } - protected long getVisibleLength() { + long getVisibleLength() { return visibleLength; } - protected boolean isPartialBlk() { + boolean isPartialBlk() { return partialBlk; } - protected void setOutBytes(byte[] bytes) { - this.outBytes = bytes; - } - - protected byte[] getOutBytes() { - return outBytes; - } - - protected int getBytesPerCRC() { - return bytesPerCRC; - } - - protected DataChecksum.Type getCrcType() { - return crcType; - } - - protected long getCrcPerBlock() { - return crcPerBlock; - } - - protected int getChecksumSize() { - return checksumSize; - } - - protected BlockMetadataHeader getHeader() { + BlockMetadataHeader getHeader() { return header; } - protected DataChecksum getChecksum() { + DataChecksum getChecksum() { return checksum; } /** * Perform the block checksum computing. + * * @throws IOException */ abstract void compute() throws IOException; /** * Read block metadata header. + * * @throws IOException */ - protected void readHeader() throws IOException { + void readHeader() throws IOException { //read metadata file header = BlockMetadataHeader.readHeader(checksumIn); checksum = header.getChecksum(); - checksumSize = checksum.getChecksumSize(); - bytesPerCRC = checksum.getBytesPerChecksum(); - crcPerBlock = checksumSize <= 0 ? 0 : + setChecksumSize(checksum.getChecksumSize()); + setBytesPerCRC(checksum.getBytesPerChecksum()); + long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0 : (metadataIn.getLength() - - BlockMetadataHeader.getHeaderSize()) / checksumSize; - crcType = checksum.getChecksumType(); + BlockMetadataHeader.getHeaderSize()) / checksum.getChecksumSize(); + setCrcPerBlock(crcPerBlock); + setCrcType(checksum.getChecksumType()); } /** * Calculate partial block checksum. + * * @return * @throws IOException */ - protected byte[] crcPartialBlock() throws IOException { - int partialLength = (int) (requestLength % bytesPerCRC); + byte[] crcPartialBlock() throws IOException { + int partialLength = (int) (requestLength % getBytesPerCRC()); if (partialLength > 0) { byte[] buf = new byte[partialLength]; - final InputStream blockIn = datanode.data.getBlockInputStream(block, + final InputStream blockIn = getBlockInputStream(block, requestLength - partialLength); try { // Get the CRC of the partialLength. @@ -181,7 +246,7 @@ protected byte[] crcPartialBlock() throws IOException { IOUtils.closeStream(blockIn); } checksum.update(buf, 0, partialLength); - byte[] partialCrc = new byte[checksumSize]; + byte[] partialCrc = new byte[getChecksumSize()]; checksum.writeValue(partialCrc, 0, true); return partialCrc; } @@ -229,7 +294,7 @@ private MD5Hash checksumWholeBlock() throws IOException { } private MD5Hash checksumPartialBlock() throws IOException { - byte[] buffer = new byte[4*1024]; + byte[] buffer = new byte[4 * 1024]; MessageDigest digester = MD5Hash.getDigester(); long remaining = (getRequestLength() / getBytesPerCRC()) @@ -251,4 +316,115 @@ private MD5Hash checksumPartialBlock() throws IOException { return new MD5Hash(digester.digest()); } } -} + + /** + * Non-striped block group checksum computer for striped blocks. + */ + static class BlockGroupNonStripedChecksumComputer + extends AbstractBlockChecksumComputer { + + private final ExtendedBlock blockGroup; + private final ErasureCodingPolicy ecPolicy; + private final DatanodeInfo[] datanodes; + private final Token[] blockTokens; + + private final DataOutputBuffer md5writer = new DataOutputBuffer(); + + BlockGroupNonStripedChecksumComputer(DataNode datanode, + StripedBlockInfo stripedBlockInfo) + throws IOException { + super(datanode); + this.blockGroup = stripedBlockInfo.getBlock(); + this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy(); + this.datanodes = stripedBlockInfo.getDatanodes(); + this.blockTokens = stripedBlockInfo.getBlockTokens(); + } + + @Override + void compute() throws IOException { + for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) { + ExtendedBlock block = + StripedBlockUtil.constructInternalBlock(blockGroup, + ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx); + DatanodeInfo targetDatanode = datanodes[idx]; + Token blockToken = blockTokens[idx]; + checksumBlock(block, idx, blockToken, targetDatanode); + } + + MD5Hash md5out = MD5Hash.digest(md5writer.getData()); + setOutBytes(md5out.getDigest()); + } + + private void checksumBlock(ExtendedBlock block, int blockIdx, + Token blockToken, + DatanodeInfo targetDatanode) throws IOException { + int timeout = 3000; + try (IOStreamPair pair = getDatanode().connectToDN(targetDatanode, + timeout, block, blockToken)) { + + LOG.debug("write to {}: {}, block={}", + getDatanode(), Op.BLOCK_CHECKSUM, block); + + // get block MD5 + createSender(pair).blockChecksum(block, blockToken); + + final DataTransferProtos.BlockOpResponseProto reply = + DataTransferProtos.BlockOpResponseProto.parseFrom( + PBHelperClient.vintPrefixed(pair.in)); + + String logInfo = "for block " + block + + " from datanode " + targetDatanode; + DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); + + DataTransferProtos.OpBlockChecksumResponseProto checksumData = + reply.getChecksumResponse(); + + //read byte-per-checksum + final int bpc = checksumData.getBytesPerCrc(); + if (blockIdx == 0) { //first block + setBytesPerCRC(bpc); + } else if (bpc != getBytesPerCRC()) { + throw new IOException("Byte-per-checksum not matched: bpc=" + bpc + + " but bytesPerCRC=" + getBytesPerCRC()); + } + + //read crc-per-block + final long cpb = checksumData.getCrcPerBlock(); + if (blockIdx == 0) { + setCrcPerBlock(cpb); + } + + //read md5 + final MD5Hash md5 = new MD5Hash( + checksumData.getMd5().toByteArray()); + md5.write(md5writer); + + // read crc-type + final DataChecksum.Type ct; + if (checksumData.hasCrcType()) { + ct = PBHelperClient.convert(checksumData.getCrcType()); + } else { + LOG.debug("Retrieving checksum from an earlier-version DataNode: " + + "inferring checksum by reading first byte"); + ct = DataChecksum.Type.DEFAULT; + } + + if (blockIdx == 0) { // first block + setCrcType(ct); + } else if (getCrcType() != DataChecksum.Type.MIXED && + getCrcType() != ct) { + // if crc types are mixed in a file + setCrcType(DataChecksum.Type.MIXED); + } + + if (LOG.isDebugEnabled()) { + if (blockIdx == 0) { + LOG.debug("set bytesPerCRC=" + getBytesPerCRC() + + ", crcPerBlock=" + getCrcPerBlock()); + } + LOG.debug("got reply from " + targetDatanode + ": md5=" + md5); + } + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 1d4a79ac88d..63bf5ae5363 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; @@ -46,7 +47,9 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer; +import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer; import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer; +import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockGroupNonStripedChecksumComputer; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo; @@ -923,6 +926,46 @@ public void blockChecksum(ExtendedBlock block, datanode.metrics.addBlockChecksumOp(elapsed()); } + @Override + public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, + final Token blockToken) + throws IOException { + updateCurrentThreadName("Getting checksum for block group" + + stripedBlockInfo.getBlock()); + final DataOutputStream out = new DataOutputStream(getOutputStream()); + checkAccess(out, true, stripedBlockInfo.getBlock(), blockToken, + Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ); + + AbstractBlockChecksumComputer maker = + new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo); + + try { + maker.compute(); + + //write reply + BlockOpResponseProto.newBuilder() + .setStatus(SUCCESS) + .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() + .setBytesPerCrc(maker.getBytesPerCRC()) + .setCrcPerBlock(maker.getCrcPerBlock()) + .setMd5(ByteString.copyFrom(maker.getOutBytes())) + .setCrcType(PBHelperClient.convert(maker.getCrcType()))) + .build() + .writeDelimitedTo(out); + out.flush(); + } catch (IOException ioe) { + LOG.info("blockChecksum " + stripedBlockInfo.getBlock() + + " received exception " + ioe); + incrDatanodeNetworkErrors(); + throw ioe; + } finally { + IOUtils.closeStream(out); + } + + //update metrics + datanode.metrics.addBlockChecksumOp(elapsed()); + } + @Override public void copyBlock(final ExtendedBlock block, final Token blockToken) throws IOException {