Revert "HDFS-9694. Make existing DFSClient#getFileChecksum() work for striped blocks. Contributed by Kai Zheng"
This reverts commit e5ff0ea7ba
.
This commit is contained in:
parent
e5ff0ea7ba
commit
a337ceb74e
|
@ -8,7 +8,6 @@
|
||||||
<Class name="org.apache.hadoop.hdfs.protocol.LocatedBlock"/>
|
<Class name="org.apache.hadoop.hdfs.protocol.LocatedBlock"/>
|
||||||
<Class name="org.apache.hadoop.hdfs.protocol.BlockStoragePolicy"/>
|
<Class name="org.apache.hadoop.hdfs.protocol.BlockStoragePolicy"/>
|
||||||
<Class name="org.apache.hadoop.hdfs.protocol.CorruptFileBlocks"/>
|
<Class name="org.apache.hadoop.hdfs.protocol.CorruptFileBlocks"/>
|
||||||
<Class name="org.apache.hadoop.hdfs.protocol.StripedBlockInfo"/>
|
|
||||||
<Class name="org.apache.hadoop.hdfs.protocol.DirectoryListing"/>
|
<Class name="org.apache.hadoop.hdfs.protocol.DirectoryListing"/>
|
||||||
<Class name="org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier"/>
|
<Class name="org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier"/>
|
||||||
<Class name="org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey"/>
|
<Class name="org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey"/>
|
||||||
|
|
|
@ -1704,10 +1704,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the checksum of the whole file or a range of the file. Note that the
|
* Get the checksum of the whole file or a range of the file. Note that the
|
||||||
* range always starts from the beginning of the file. The file can be
|
* range always starts from the beginning of the file.
|
||||||
* in replicated form, or striped mode. It can be used to checksum and compare
|
|
||||||
* two replicated files, or two striped files, but not applicable for two
|
|
||||||
* files of different block layout forms.
|
|
||||||
* @param src The file path
|
* @param src The file path
|
||||||
* @param length the length of the range, i.e., the range is [0, length]
|
* @param length the length of the range, i.e., the range is [0, length]
|
||||||
* @return The checksum
|
* @return The checksum
|
||||||
|
@ -1720,11 +1717,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
|
|
||||||
LocatedBlocks blockLocations = getBlockLocations(src, length);
|
LocatedBlocks blockLocations = getBlockLocations(src, length);
|
||||||
|
|
||||||
FileChecksumHelper.FileChecksumComputer maker;
|
FileChecksumHelper.FileChecksumComputer maker =
|
||||||
ErasureCodingPolicy ecPolicy = blockLocations.getErasureCodingPolicy();
|
|
||||||
maker = ecPolicy != null ?
|
|
||||||
new FileChecksumHelper.StripedFileNonStripedChecksumComputer(src,
|
|
||||||
length, blockLocations, namenode, this, ecPolicy) :
|
|
||||||
new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
|
new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
|
||||||
blockLocations, namenode, this);
|
blockLocations, namenode, this);
|
||||||
|
|
||||||
|
|
|
@ -22,13 +22,10 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||||
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
|
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
||||||
|
@ -78,7 +75,7 @@ final class FileChecksumHelper {
|
||||||
private int bytesPerCRC = -1;
|
private int bytesPerCRC = -1;
|
||||||
private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
|
private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
|
||||||
private long crcPerBlock = 0;
|
private long crcPerBlock = 0;
|
||||||
private boolean isRefetchBlocks = false;
|
private boolean refetchBlocks = false;
|
||||||
private int lastRetriedIndex = -1;
|
private int lastRetriedIndex = -1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -130,11 +127,8 @@ final class FileChecksumHelper {
|
||||||
return blockLocations;
|
return blockLocations;
|
||||||
}
|
}
|
||||||
|
|
||||||
void refetchBlocks() throws IOException {
|
void setBlockLocations(LocatedBlocks blockLocations) {
|
||||||
this.blockLocations = getClient().getBlockLocations(getSrc(),
|
this.blockLocations = blockLocations;
|
||||||
getLength());
|
|
||||||
this.locatedBlocks = getBlockLocations().getLocatedBlocks();
|
|
||||||
this.isRefetchBlocks = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int getTimeout() {
|
int getTimeout() {
|
||||||
|
@ -149,6 +143,10 @@ final class FileChecksumHelper {
|
||||||
return locatedBlocks;
|
return locatedBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setLocatedBlocks(List<LocatedBlock> locatedBlocks) {
|
||||||
|
this.locatedBlocks = locatedBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
long getRemaining() {
|
long getRemaining() {
|
||||||
return remaining;
|
return remaining;
|
||||||
}
|
}
|
||||||
|
@ -182,11 +180,11 @@ final class FileChecksumHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isRefetchBlocks() {
|
boolean isRefetchBlocks() {
|
||||||
return isRefetchBlocks;
|
return refetchBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setRefetchBlocks(boolean refetchBlocks) {
|
void setRefetchBlocks(boolean refetchBlocks) {
|
||||||
this.isRefetchBlocks = refetchBlocks;
|
this.refetchBlocks = refetchBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
int getLastRetriedIndex() {
|
int getLastRetriedIndex() {
|
||||||
|
@ -280,7 +278,10 @@ final class FileChecksumHelper {
|
||||||
blockIdx < getLocatedBlocks().size() && getRemaining() >= 0;
|
blockIdx < getLocatedBlocks().size() && getRemaining() >= 0;
|
||||||
blockIdx++) {
|
blockIdx++) {
|
||||||
if (isRefetchBlocks()) { // refetch to get fresh tokens
|
if (isRefetchBlocks()) { // refetch to get fresh tokens
|
||||||
refetchBlocks();
|
setBlockLocations(getClient().getBlockLocations(getSrc(),
|
||||||
|
getLength()));
|
||||||
|
setLocatedBlocks(getBlockLocations().getLocatedBlocks());
|
||||||
|
setRefetchBlocks(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
|
LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
|
||||||
|
@ -379,13 +380,15 @@ final class FileChecksumHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
//read md5
|
//read md5
|
||||||
final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());
|
final MD5Hash md5 = new MD5Hash(
|
||||||
|
checksumData.getMd5().toByteArray());
|
||||||
md5.write(getMd5out());
|
md5.write(getMd5out());
|
||||||
|
|
||||||
// read crc-type
|
// read crc-type
|
||||||
final DataChecksum.Type ct;
|
final DataChecksum.Type ct;
|
||||||
if (checksumData.hasCrcType()) {
|
if (checksumData.hasCrcType()) {
|
||||||
ct = PBHelperClient.convert(checksumData.getCrcType());
|
ct = PBHelperClient.convert(checksumData
|
||||||
|
.getCrcType());
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
|
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
|
||||||
"inferring checksum by reading first byte");
|
"inferring checksum by reading first byte");
|
||||||
|
@ -410,160 +413,4 @@ final class FileChecksumHelper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Striped file checksum computing.
|
|
||||||
*/
|
|
||||||
static class StripedFileNonStripedChecksumComputer
|
|
||||||
extends FileChecksumComputer {
|
|
||||||
private final ErasureCodingPolicy ecPolicy;
|
|
||||||
private int bgIdx;
|
|
||||||
|
|
||||||
StripedFileNonStripedChecksumComputer(String src, long length,
|
|
||||||
LocatedBlocks blockLocations,
|
|
||||||
ClientProtocol namenode,
|
|
||||||
DFSClient client,
|
|
||||||
ErasureCodingPolicy ecPolicy)
|
|
||||||
throws IOException {
|
|
||||||
super(src, length, blockLocations, namenode, client);
|
|
||||||
|
|
||||||
this.ecPolicy = ecPolicy;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void checksumBlocks() throws IOException {
|
|
||||||
int tmpTimeout = 3000 * 1 + getClient().getConf().getSocketTimeout();
|
|
||||||
setTimeout(tmpTimeout);
|
|
||||||
|
|
||||||
for (bgIdx = 0;
|
|
||||||
bgIdx < getLocatedBlocks().size() && getRemaining() >= 0; bgIdx++) {
|
|
||||||
if (isRefetchBlocks()) { // refetch to get fresh tokens
|
|
||||||
refetchBlocks();
|
|
||||||
}
|
|
||||||
|
|
||||||
LocatedBlock locatedBlock = getLocatedBlocks().get(bgIdx);
|
|
||||||
LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock;
|
|
||||||
|
|
||||||
if (!checksumBlockGroup(blockGroup)) {
|
|
||||||
throw new IOException("Fail to get block MD5 for " + locatedBlock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private boolean checksumBlockGroup(
|
|
||||||
LocatedStripedBlock blockGroup) throws IOException {
|
|
||||||
ExtendedBlock block = blockGroup.getBlock();
|
|
||||||
if (getRemaining() < block.getNumBytes()) {
|
|
||||||
block.setNumBytes(getRemaining());
|
|
||||||
}
|
|
||||||
setRemaining(getRemaining() - block.getNumBytes());
|
|
||||||
|
|
||||||
StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block,
|
|
||||||
blockGroup.getLocations(), blockGroup.getBlockTokens(), ecPolicy);
|
|
||||||
DatanodeInfo[] datanodes = blockGroup.getLocations();
|
|
||||||
|
|
||||||
//try each datanode in the block group.
|
|
||||||
boolean done = false;
|
|
||||||
for (int j = 0; !done && j < datanodes.length; j++) {
|
|
||||||
try {
|
|
||||||
tryDatanode(blockGroup, stripedBlockInfo, datanodes[j]);
|
|
||||||
done = true;
|
|
||||||
} catch (InvalidBlockTokenException ibte) {
|
|
||||||
if (bgIdx > getLastRetriedIndex()) {
|
|
||||||
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
|
|
||||||
+ "for file {} for block {} from datanode {}. Will retry "
|
|
||||||
+ "the block once.",
|
|
||||||
getSrc(), block, datanodes[j]);
|
|
||||||
setLastRetriedIndex(bgIdx);
|
|
||||||
done = true; // actually it's not done; but we'll retry
|
|
||||||
bgIdx--; // repeat at bgIdx-th block
|
|
||||||
setRefetchBlocks(true);
|
|
||||||
}
|
|
||||||
} catch (IOException ie) {
|
|
||||||
LOG.warn("src={}" + ", datanodes[{}]={}",
|
|
||||||
getSrc(), j, datanodes[j], ie);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return done;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return true when sounds good to continue or retry, false when severe
|
|
||||||
* condition or totally failed.
|
|
||||||
*/
|
|
||||||
private void tryDatanode(LocatedStripedBlock blockGroup,
|
|
||||||
StripedBlockInfo stripedBlockInfo,
|
|
||||||
DatanodeInfo datanode) throws IOException {
|
|
||||||
|
|
||||||
try (IOStreamPair pair = getClient().connectToDN(datanode,
|
|
||||||
getTimeout(), blockGroup.getBlockToken())) {
|
|
||||||
|
|
||||||
LOG.debug("write to {}: {}, blockGroup={}",
|
|
||||||
datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup);
|
|
||||||
|
|
||||||
// get block MD5
|
|
||||||
createSender(pair).blockGroupChecksum(stripedBlockInfo,
|
|
||||||
blockGroup.getBlockToken());
|
|
||||||
|
|
||||||
BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
|
|
||||||
PBHelperClient.vintPrefixed(pair.in));
|
|
||||||
|
|
||||||
String logInfo = "for blockGroup " + blockGroup +
|
|
||||||
" from datanode " + datanode;
|
|
||||||
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
|
|
||||||
|
|
||||||
OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse();
|
|
||||||
|
|
||||||
//read byte-per-checksum
|
|
||||||
final int bpc = checksumData.getBytesPerCrc();
|
|
||||||
if (bgIdx == 0) { //first block
|
|
||||||
setBytesPerCRC(bpc);
|
|
||||||
} else {
|
|
||||||
if (bpc != getBytesPerCRC()) {
|
|
||||||
throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
|
|
||||||
+ " but bytesPerCRC=" + getBytesPerCRC());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//read crc-per-block
|
|
||||||
final long cpb = checksumData.getCrcPerBlock();
|
|
||||||
if (getLocatedBlocks().size() > 1 && bgIdx == 0) { // first block
|
|
||||||
setCrcPerBlock(cpb);
|
|
||||||
}
|
|
||||||
|
|
||||||
//read md5
|
|
||||||
final MD5Hash md5 = new MD5Hash(
|
|
||||||
checksumData.getMd5().toByteArray());
|
|
||||||
md5.write(getMd5out());
|
|
||||||
|
|
||||||
// read crc-type
|
|
||||||
final DataChecksum.Type ct;
|
|
||||||
if (checksumData.hasCrcType()) {
|
|
||||||
ct = PBHelperClient.convert(checksumData.getCrcType());
|
|
||||||
} else {
|
|
||||||
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
|
|
||||||
"inferring checksum by reading first byte");
|
|
||||||
ct = getClient().inferChecksumTypeByReading(blockGroup, datanode);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bgIdx == 0) {
|
|
||||||
setCrcType(ct);
|
|
||||||
} else if (getCrcType() != DataChecksum.Type.MIXED &&
|
|
||||||
getCrcType() != ct) {
|
|
||||||
// if crc types are mixed in a file
|
|
||||||
setCrcType(DataChecksum.Type.MIXED);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
if (bgIdx == 0) {
|
|
||||||
LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
|
|
||||||
+ ", crcPerBlock=" + getCrcPerBlock());
|
|
||||||
}
|
|
||||||
LOG.debug("got reply from " + datanode + ": md5=" + md5);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
|
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
|
||||||
|
@ -198,17 +197,6 @@ public interface DataTransferProtocol {
|
||||||
* @param blockToken security token for accessing the block.
|
* @param blockToken security token for accessing the block.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void blockChecksum(ExtendedBlock blk,
|
void blockChecksum(final ExtendedBlock blk,
|
||||||
Token<BlockTokenIdentifier> blockToken) throws IOException;
|
final Token<BlockTokenIdentifier> blockToken) throws IOException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get striped block group checksum (MD5 of CRC32).
|
|
||||||
*
|
|
||||||
* @param stripedBlockInfo a striped block info.
|
|
||||||
* @param blockToken security token for accessing the block.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
|
|
||||||
Token<BlockTokenIdentifier> blockToken) throws IOException;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,7 +38,6 @@ public enum Op {
|
||||||
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
|
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
|
||||||
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
|
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
|
||||||
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
|
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
|
||||||
BLOCK_GROUP_CHECKSUM((byte)90),
|
|
||||||
CUSTOM((byte)127);
|
CUSTOM((byte)127);
|
||||||
|
|
||||||
/** The code for this operation. */
|
/** The code for this operation. */
|
||||||
|
|
|
@ -28,13 +28,11 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
|
||||||
|
@ -263,21 +261,4 @@ public class Sender implements DataTransferProtocol {
|
||||||
|
|
||||||
send(out, Op.BLOCK_CHECKSUM, proto);
|
send(out, Op.BLOCK_CHECKSUM, proto);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
|
|
||||||
Token<BlockTokenIdentifier> blockToken) throws IOException {
|
|
||||||
OpBlockGroupChecksumProto proto = OpBlockGroupChecksumProto.newBuilder()
|
|
||||||
.setHeader(DataTransferProtoUtil.buildBaseHeader(
|
|
||||||
stripedBlockInfo.getBlock(), blockToken))
|
|
||||||
.setDatanodes(PBHelperClient.convertToProto(
|
|
||||||
stripedBlockInfo.getDatanodes()))
|
|
||||||
.addAllBlockTokens(PBHelperClient.convert(
|
|
||||||
stripedBlockInfo.getBlockTokens()))
|
|
||||||
.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
|
|
||||||
stripedBlockInfo.getErasureCodingPolicy()))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -553,8 +553,10 @@ public class PBHelperClient {
|
||||||
proto.getCorrupt(),
|
proto.getCorrupt(),
|
||||||
cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
|
cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
|
||||||
List<TokenProto> tokenProtos = proto.getBlockTokensList();
|
List<TokenProto> tokenProtos = proto.getBlockTokensList();
|
||||||
Token<BlockTokenIdentifier>[] blockTokens =
|
Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
|
||||||
convertTokens(tokenProtos);
|
for (int i = 0; i < indices.length; i++) {
|
||||||
|
blockTokens[i] = convert(tokenProtos.get(i));
|
||||||
|
}
|
||||||
((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
|
((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
|
||||||
}
|
}
|
||||||
lb.setBlockToken(convert(proto.getBlockToken()));
|
lb.setBlockToken(convert(proto.getBlockToken()));
|
||||||
|
@ -562,18 +564,6 @@ public class PBHelperClient {
|
||||||
return lb;
|
return lb;
|
||||||
}
|
}
|
||||||
|
|
||||||
static public Token<BlockTokenIdentifier>[] convertTokens(
|
|
||||||
List<TokenProto> tokenProtos) {
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
Token<BlockTokenIdentifier>[] blockTokens = new Token[tokenProtos.size()];
|
|
||||||
for (int i = 0; i < blockTokens.length; i++) {
|
|
||||||
blockTokens[i] = convert(tokenProtos.get(i));
|
|
||||||
}
|
|
||||||
|
|
||||||
return blockTokens;
|
|
||||||
}
|
|
||||||
|
|
||||||
static public DatanodeInfo convert(DatanodeInfoProto di) {
|
static public DatanodeInfo convert(DatanodeInfoProto di) {
|
||||||
if (di == null) return null;
|
if (di == null) return null;
|
||||||
return new DatanodeInfo(
|
return new DatanodeInfo(
|
||||||
|
@ -825,7 +815,9 @@ public class PBHelperClient {
|
||||||
byte[] indices = sb.getBlockIndices();
|
byte[] indices = sb.getBlockIndices();
|
||||||
builder.setBlockIndices(PBHelperClient.getByteString(indices));
|
builder.setBlockIndices(PBHelperClient.getByteString(indices));
|
||||||
Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
|
Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
|
||||||
builder.addAllBlockTokens(convert(blockTokens));
|
for (int i = 0; i < indices.length; i++) {
|
||||||
|
builder.addBlockTokens(PBHelperClient.convert(blockTokens[i]));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return builder.setB(PBHelperClient.convert(b.getBlock()))
|
return builder.setB(PBHelperClient.convert(b.getBlock()))
|
||||||
|
@ -833,16 +825,6 @@ public class PBHelperClient {
|
||||||
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
|
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<TokenProto> convert(
|
|
||||||
Token<BlockTokenIdentifier>[] blockTokens) {
|
|
||||||
List<TokenProto> results = new ArrayList<>(blockTokens.length);
|
|
||||||
for (Token<BlockTokenIdentifier> bt : blockTokens) {
|
|
||||||
results.add(convert(bt));
|
|
||||||
}
|
|
||||||
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
|
public static BlockStoragePolicy convert(BlockStoragePolicyProto proto) {
|
||||||
List<StorageTypeProto> cList = proto.getCreationPolicy()
|
List<StorageTypeProto> cList = proto.getCreationPolicy()
|
||||||
.getStorageTypesList();
|
.getStorageTypesList();
|
||||||
|
@ -2518,14 +2500,4 @@ public class PBHelperClient {
|
||||||
.setId(policy.getId());
|
.setId(policy.getId());
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HdfsProtos.DatanodeInfosProto convertToProto(
|
|
||||||
DatanodeInfo[] datanodeInfos) {
|
|
||||||
HdfsProtos.DatanodeInfosProto.Builder builder =
|
|
||||||
HdfsProtos.DatanodeInfosProto.newBuilder();
|
|
||||||
for (DatanodeInfo datanodeInfo : datanodeInfos) {
|
|
||||||
builder.addDatanodes(PBHelperClient.convert(datanodeInfo));
|
|
||||||
}
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,18 +75,6 @@ public class StripedBlockUtil {
|
||||||
|
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class);
|
public static final Logger LOG = LoggerFactory.getLogger(StripedBlockUtil.class);
|
||||||
|
|
||||||
/**
|
|
||||||
* Parses a striped block group into individual blocks.
|
|
||||||
* @param bg The striped block group
|
|
||||||
* @param ecPolicy The erasure coding policy
|
|
||||||
* @return An array of the blocks in the group
|
|
||||||
*/
|
|
||||||
public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
|
|
||||||
ErasureCodingPolicy ecPolicy) {
|
|
||||||
return parseStripedBlockGroup(bg, ecPolicy.getCellSize(),
|
|
||||||
ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method parses a striped block group into individual blocks.
|
* This method parses a striped block group into individual blocks.
|
||||||
*
|
*
|
||||||
|
|
|
@ -74,6 +74,7 @@ message OpReadBlockProto {
|
||||||
optional CachingStrategyProto cachingStrategy = 5;
|
optional CachingStrategyProto cachingStrategy = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
message ChecksumProto {
|
message ChecksumProto {
|
||||||
required ChecksumTypeProto type = 1;
|
required ChecksumTypeProto type = 1;
|
||||||
required uint32 bytesPerChecksum = 2;
|
required uint32 bytesPerChecksum = 2;
|
||||||
|
@ -148,14 +149,6 @@ message OpBlockChecksumProto {
|
||||||
required BaseHeaderProto header = 1;
|
required BaseHeaderProto header = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message OpBlockGroupChecksumProto {
|
|
||||||
required BaseHeaderProto header = 1;
|
|
||||||
required DatanodeInfosProto datanodes = 2;
|
|
||||||
// each internal block has a block token
|
|
||||||
repeated hadoop.common.TokenProto blockTokens = 3;
|
|
||||||
required ErasureCodingPolicyProto ecPolicy = 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An ID uniquely identifying a shared memory segment.
|
* An ID uniquely identifying a shared memory segment.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -26,13 +26,11 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
|
||||||
|
@ -113,9 +111,6 @@ public abstract class Receiver implements DataTransferProtocol {
|
||||||
case BLOCK_CHECKSUM:
|
case BLOCK_CHECKSUM:
|
||||||
opBlockChecksum(in);
|
opBlockChecksum(in);
|
||||||
break;
|
break;
|
||||||
case BLOCK_GROUP_CHECKSUM:
|
|
||||||
opStripedBlockChecksum(in);
|
|
||||||
break;
|
|
||||||
case TRANSFER_BLOCK:
|
case TRANSFER_BLOCK:
|
||||||
opTransferBlock(in);
|
opTransferBlock(in);
|
||||||
break;
|
break;
|
||||||
|
@ -295,27 +290,4 @@ public abstract class Receiver implements DataTransferProtocol {
|
||||||
if (traceScope != null) traceScope.close();
|
if (traceScope != null) traceScope.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Receive OP_STRIPED_BLOCK_CHECKSUM. */
|
|
||||||
private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
|
|
||||||
OpBlockGroupChecksumProto proto =
|
|
||||||
OpBlockGroupChecksumProto.parseFrom(vintPrefixed(dis));
|
|
||||||
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
|
|
||||||
proto.getClass().getSimpleName());
|
|
||||||
StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(
|
|
||||||
PBHelperClient.convert(proto.getHeader().getBlock()),
|
|
||||||
PBHelperClient.convert(proto.getDatanodes()),
|
|
||||||
PBHelperClient.convertTokens(proto.getBlockTokensList()),
|
|
||||||
PBHelperClient.convertErasureCodingPolicy(proto.getEcPolicy())
|
|
||||||
);
|
|
||||||
|
|
||||||
try {
|
|
||||||
blockGroupChecksum(stripedBlockInfo,
|
|
||||||
PBHelperClient.convert(proto.getHeader().getToken()));
|
|
||||||
} finally {
|
|
||||||
if (traceScope != null) {
|
|
||||||
traceScope.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,30 +19,16 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
|
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.MD5Hash;
|
import org.apache.hadoop.io.MD5Hash;
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
|
@ -55,87 +41,13 @@ final class BlockChecksumHelper {
|
||||||
|
|
||||||
static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
|
static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
|
||||||
|
|
||||||
private BlockChecksumHelper() {
|
private BlockChecksumHelper() {}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The abstract base block checksum computer.
|
* The abstract base block checksum computer.
|
||||||
*/
|
*/
|
||||||
static abstract class AbstractBlockChecksumComputer {
|
static abstract class BlockChecksumComputer {
|
||||||
private final DataNode datanode;
|
private final DataNode datanode;
|
||||||
|
|
||||||
private byte[] outBytes;
|
|
||||||
private int bytesPerCRC = -1;
|
|
||||||
private DataChecksum.Type crcType = null;
|
|
||||||
private long crcPerBlock = -1;
|
|
||||||
private int checksumSize = -1;
|
|
||||||
|
|
||||||
AbstractBlockChecksumComputer(DataNode datanode) throws IOException {
|
|
||||||
this.datanode = datanode;
|
|
||||||
}
|
|
||||||
|
|
||||||
abstract void compute() throws IOException;
|
|
||||||
|
|
||||||
Sender createSender(IOStreamPair pair) {
|
|
||||||
DataOutputStream out = (DataOutputStream) pair.out;
|
|
||||||
return new Sender(out);
|
|
||||||
}
|
|
||||||
|
|
||||||
DataNode getDatanode() {
|
|
||||||
return datanode;
|
|
||||||
}
|
|
||||||
|
|
||||||
InputStream getBlockInputStream(ExtendedBlock block, long seekOffset)
|
|
||||||
throws IOException {
|
|
||||||
return datanode.data.getBlockInputStream(block, seekOffset);
|
|
||||||
}
|
|
||||||
|
|
||||||
void setOutBytes(byte[] bytes) {
|
|
||||||
this.outBytes = bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[] getOutBytes() {
|
|
||||||
return outBytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
int getBytesPerCRC() {
|
|
||||||
return bytesPerCRC;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBytesPerCRC(int bytesPerCRC) {
|
|
||||||
this.bytesPerCRC = bytesPerCRC;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCrcType(DataChecksum.Type crcType) {
|
|
||||||
this.crcType = crcType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCrcPerBlock(long crcPerBlock) {
|
|
||||||
this.crcPerBlock = crcPerBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setChecksumSize(int checksumSize) {
|
|
||||||
this.checksumSize = checksumSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
DataChecksum.Type getCrcType() {
|
|
||||||
return crcType;
|
|
||||||
}
|
|
||||||
|
|
||||||
long getCrcPerBlock() {
|
|
||||||
return crcPerBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
int getChecksumSize() {
|
|
||||||
return checksumSize;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The abstract base block checksum computer.
|
|
||||||
*/
|
|
||||||
static abstract class BlockChecksumComputer
|
|
||||||
extends AbstractBlockChecksumComputer {
|
|
||||||
private final ExtendedBlock block;
|
private final ExtendedBlock block;
|
||||||
// client side now can specify a range of the block for checksum
|
// client side now can specify a range of the block for checksum
|
||||||
private final long requestLength;
|
private final long requestLength;
|
||||||
|
@ -144,12 +56,17 @@ final class BlockChecksumHelper {
|
||||||
private final long visibleLength;
|
private final long visibleLength;
|
||||||
private final boolean partialBlk;
|
private final boolean partialBlk;
|
||||||
|
|
||||||
|
private byte[] outBytes;
|
||||||
|
private int bytesPerCRC = -1;
|
||||||
|
private DataChecksum.Type crcType = null;
|
||||||
|
private long crcPerBlock = -1;
|
||||||
|
private int checksumSize = -1;
|
||||||
private BlockMetadataHeader header;
|
private BlockMetadataHeader header;
|
||||||
private DataChecksum checksum;
|
private DataChecksum checksum;
|
||||||
|
|
||||||
BlockChecksumComputer(DataNode datanode,
|
BlockChecksumComputer(DataNode datanode,
|
||||||
ExtendedBlock block) throws IOException {
|
ExtendedBlock block) throws IOException {
|
||||||
super(datanode);
|
this.datanode = datanode;
|
||||||
this.block = block;
|
this.block = block;
|
||||||
this.requestLength = block.getNumBytes();
|
this.requestLength = block.getNumBytes();
|
||||||
Preconditions.checkArgument(requestLength >= 0);
|
Preconditions.checkArgument(requestLength >= 0);
|
||||||
|
@ -164,80 +81,98 @@ final class BlockChecksumHelper {
|
||||||
new BufferedInputStream(metadataIn, ioFileBufferSize));
|
new BufferedInputStream(metadataIn, ioFileBufferSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
Sender createSender(IOStreamPair pair) {
|
protected DataNode getDatanode() {
|
||||||
DataOutputStream out = (DataOutputStream) pair.out;
|
return datanode;
|
||||||
return new Sender(out);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected ExtendedBlock getBlock() {
|
||||||
ExtendedBlock getBlock() {
|
|
||||||
return block;
|
return block;
|
||||||
}
|
}
|
||||||
|
|
||||||
long getRequestLength() {
|
protected long getRequestLength() {
|
||||||
return requestLength;
|
return requestLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
LengthInputStream getMetadataIn() {
|
protected LengthInputStream getMetadataIn() {
|
||||||
return metadataIn;
|
return metadataIn;
|
||||||
}
|
}
|
||||||
|
|
||||||
DataInputStream getChecksumIn() {
|
protected DataInputStream getChecksumIn() {
|
||||||
return checksumIn;
|
return checksumIn;
|
||||||
}
|
}
|
||||||
|
|
||||||
long getVisibleLength() {
|
protected long getVisibleLength() {
|
||||||
return visibleLength;
|
return visibleLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isPartialBlk() {
|
protected boolean isPartialBlk() {
|
||||||
return partialBlk;
|
return partialBlk;
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockMetadataHeader getHeader() {
|
protected void setOutBytes(byte[] bytes) {
|
||||||
|
this.outBytes = bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected byte[] getOutBytes() {
|
||||||
|
return outBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getBytesPerCRC() {
|
||||||
|
return bytesPerCRC;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected DataChecksum.Type getCrcType() {
|
||||||
|
return crcType;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long getCrcPerBlock() {
|
||||||
|
return crcPerBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getChecksumSize() {
|
||||||
|
return checksumSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BlockMetadataHeader getHeader() {
|
||||||
return header;
|
return header;
|
||||||
}
|
}
|
||||||
|
|
||||||
DataChecksum getChecksum() {
|
protected DataChecksum getChecksum() {
|
||||||
return checksum;
|
return checksum;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform the block checksum computing.
|
* Perform the block checksum computing.
|
||||||
*
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
abstract void compute() throws IOException;
|
abstract void compute() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read block metadata header.
|
* Read block metadata header.
|
||||||
*
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void readHeader() throws IOException {
|
protected void readHeader() throws IOException {
|
||||||
//read metadata file
|
//read metadata file
|
||||||
header = BlockMetadataHeader.readHeader(checksumIn);
|
header = BlockMetadataHeader.readHeader(checksumIn);
|
||||||
checksum = header.getChecksum();
|
checksum = header.getChecksum();
|
||||||
setChecksumSize(checksum.getChecksumSize());
|
checksumSize = checksum.getChecksumSize();
|
||||||
setBytesPerCRC(checksum.getBytesPerChecksum());
|
bytesPerCRC = checksum.getBytesPerChecksum();
|
||||||
long crcPerBlock = checksum.getChecksumSize() <= 0 ? 0 :
|
crcPerBlock = checksumSize <= 0 ? 0 :
|
||||||
(metadataIn.getLength() -
|
(metadataIn.getLength() -
|
||||||
BlockMetadataHeader.getHeaderSize()) / checksum.getChecksumSize();
|
BlockMetadataHeader.getHeaderSize()) / checksumSize;
|
||||||
setCrcPerBlock(crcPerBlock);
|
crcType = checksum.getChecksumType();
|
||||||
setCrcType(checksum.getChecksumType());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculate partial block checksum.
|
* Calculate partial block checksum.
|
||||||
*
|
|
||||||
* @return
|
* @return
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
byte[] crcPartialBlock() throws IOException {
|
protected byte[] crcPartialBlock() throws IOException {
|
||||||
int partialLength = (int) (requestLength % getBytesPerCRC());
|
int partialLength = (int) (requestLength % bytesPerCRC);
|
||||||
if (partialLength > 0) {
|
if (partialLength > 0) {
|
||||||
byte[] buf = new byte[partialLength];
|
byte[] buf = new byte[partialLength];
|
||||||
final InputStream blockIn = getBlockInputStream(block,
|
final InputStream blockIn = datanode.data.getBlockInputStream(block,
|
||||||
requestLength - partialLength);
|
requestLength - partialLength);
|
||||||
try {
|
try {
|
||||||
// Get the CRC of the partialLength.
|
// Get the CRC of the partialLength.
|
||||||
|
@ -246,7 +181,7 @@ final class BlockChecksumHelper {
|
||||||
IOUtils.closeStream(blockIn);
|
IOUtils.closeStream(blockIn);
|
||||||
}
|
}
|
||||||
checksum.update(buf, 0, partialLength);
|
checksum.update(buf, 0, partialLength);
|
||||||
byte[] partialCrc = new byte[getChecksumSize()];
|
byte[] partialCrc = new byte[checksumSize];
|
||||||
checksum.writeValue(partialCrc, 0, true);
|
checksum.writeValue(partialCrc, 0, true);
|
||||||
return partialCrc;
|
return partialCrc;
|
||||||
}
|
}
|
||||||
|
@ -294,7 +229,7 @@ final class BlockChecksumHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
private MD5Hash checksumPartialBlock() throws IOException {
|
private MD5Hash checksumPartialBlock() throws IOException {
|
||||||
byte[] buffer = new byte[4 * 1024];
|
byte[] buffer = new byte[4*1024];
|
||||||
MessageDigest digester = MD5Hash.getDigester();
|
MessageDigest digester = MD5Hash.getDigester();
|
||||||
|
|
||||||
long remaining = (getRequestLength() / getBytesPerCRC())
|
long remaining = (getRequestLength() / getBytesPerCRC())
|
||||||
|
@ -316,115 +251,4 @@ final class BlockChecksumHelper {
|
||||||
return new MD5Hash(digester.digest());
|
return new MD5Hash(digester.digest());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Non-striped block group checksum computer for striped blocks.
|
|
||||||
*/
|
|
||||||
static class BlockGroupNonStripedChecksumComputer
|
|
||||||
extends AbstractBlockChecksumComputer {
|
|
||||||
|
|
||||||
private final ExtendedBlock blockGroup;
|
|
||||||
private final ErasureCodingPolicy ecPolicy;
|
|
||||||
private final DatanodeInfo[] datanodes;
|
|
||||||
private final Token<BlockTokenIdentifier>[] blockTokens;
|
|
||||||
|
|
||||||
private final DataOutputBuffer md5writer = new DataOutputBuffer();
|
|
||||||
|
|
||||||
BlockGroupNonStripedChecksumComputer(DataNode datanode,
|
|
||||||
StripedBlockInfo stripedBlockInfo)
|
|
||||||
throws IOException {
|
|
||||||
super(datanode);
|
|
||||||
this.blockGroup = stripedBlockInfo.getBlock();
|
|
||||||
this.ecPolicy = stripedBlockInfo.getErasureCodingPolicy();
|
|
||||||
this.datanodes = stripedBlockInfo.getDatanodes();
|
|
||||||
this.blockTokens = stripedBlockInfo.getBlockTokens();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void compute() throws IOException {
|
|
||||||
for (int idx = 0; idx < ecPolicy.getNumDataUnits(); idx++) {
|
|
||||||
ExtendedBlock block =
|
|
||||||
StripedBlockUtil.constructInternalBlock(blockGroup,
|
|
||||||
ecPolicy.getCellSize(), ecPolicy.getNumDataUnits(), idx);
|
|
||||||
DatanodeInfo targetDatanode = datanodes[idx];
|
|
||||||
Token<BlockTokenIdentifier> blockToken = blockTokens[idx];
|
|
||||||
checksumBlock(block, idx, blockToken, targetDatanode);
|
|
||||||
}
|
|
||||||
|
|
||||||
MD5Hash md5out = MD5Hash.digest(md5writer.getData());
|
|
||||||
setOutBytes(md5out.getDigest());
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checksumBlock(ExtendedBlock block, int blockIdx,
|
|
||||||
Token<BlockTokenIdentifier> blockToken,
|
|
||||||
DatanodeInfo targetDatanode) throws IOException {
|
|
||||||
int timeout = 3000;
|
|
||||||
try (IOStreamPair pair = getDatanode().connectToDN(targetDatanode,
|
|
||||||
timeout, block, blockToken)) {
|
|
||||||
|
|
||||||
LOG.debug("write to {}: {}, block={}",
|
|
||||||
getDatanode(), Op.BLOCK_CHECKSUM, block);
|
|
||||||
|
|
||||||
// get block MD5
|
|
||||||
createSender(pair).blockChecksum(block, blockToken);
|
|
||||||
|
|
||||||
final DataTransferProtos.BlockOpResponseProto reply =
|
|
||||||
DataTransferProtos.BlockOpResponseProto.parseFrom(
|
|
||||||
PBHelperClient.vintPrefixed(pair.in));
|
|
||||||
|
|
||||||
String logInfo = "for block " + block
|
|
||||||
+ " from datanode " + targetDatanode;
|
|
||||||
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
|
|
||||||
|
|
||||||
DataTransferProtos.OpBlockChecksumResponseProto checksumData =
|
|
||||||
reply.getChecksumResponse();
|
|
||||||
|
|
||||||
//read byte-per-checksum
|
|
||||||
final int bpc = checksumData.getBytesPerCrc();
|
|
||||||
if (blockIdx == 0) { //first block
|
|
||||||
setBytesPerCRC(bpc);
|
|
||||||
} else if (bpc != getBytesPerCRC()) {
|
|
||||||
throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
|
|
||||||
+ " but bytesPerCRC=" + getBytesPerCRC());
|
|
||||||
}
|
|
||||||
|
|
||||||
//read crc-per-block
|
|
||||||
final long cpb = checksumData.getCrcPerBlock();
|
|
||||||
if (blockIdx == 0) {
|
|
||||||
setCrcPerBlock(cpb);
|
|
||||||
}
|
|
||||||
|
|
||||||
//read md5
|
|
||||||
final MD5Hash md5 = new MD5Hash(
|
|
||||||
checksumData.getMd5().toByteArray());
|
|
||||||
md5.write(md5writer);
|
|
||||||
|
|
||||||
// read crc-type
|
|
||||||
final DataChecksum.Type ct;
|
|
||||||
if (checksumData.hasCrcType()) {
|
|
||||||
ct = PBHelperClient.convert(checksumData.getCrcType());
|
|
||||||
} else {
|
|
||||||
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
|
|
||||||
"inferring checksum by reading first byte");
|
|
||||||
ct = DataChecksum.Type.DEFAULT;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (blockIdx == 0) { // first block
|
|
||||||
setCrcType(ct);
|
|
||||||
} else if (getCrcType() != DataChecksum.Type.MIXED &&
|
|
||||||
getCrcType() != ct) {
|
|
||||||
// if crc types are mixed in a file
|
|
||||||
setCrcType(DataChecksum.Type.MIXED);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
if (blockIdx == 0) {
|
|
||||||
LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
|
|
||||||
+ ", crcPerBlock=" + getCrcPerBlock());
|
|
||||||
}
|
|
||||||
LOG.debug("got reply from " + targetDatanode + ": md5=" + md5);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.StripedBlockInfo;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||||
|
@ -47,9 +46,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
|
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.AbstractBlockChecksumComputer;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer;
|
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockGroupNonStripedChecksumComputer;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
|
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
|
||||||
|
@ -926,46 +923,6 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
datanode.metrics.addBlockChecksumOp(elapsed());
|
datanode.metrics.addBlockChecksumOp(elapsed());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo,
|
|
||||||
final Token<BlockTokenIdentifier> blockToken)
|
|
||||||
throws IOException {
|
|
||||||
updateCurrentThreadName("Getting checksum for block group" +
|
|
||||||
stripedBlockInfo.getBlock());
|
|
||||||
final DataOutputStream out = new DataOutputStream(getOutputStream());
|
|
||||||
checkAccess(out, true, stripedBlockInfo.getBlock(), blockToken,
|
|
||||||
Op.BLOCK_GROUP_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
|
|
||||||
|
|
||||||
AbstractBlockChecksumComputer maker =
|
|
||||||
new BlockGroupNonStripedChecksumComputer(datanode, stripedBlockInfo);
|
|
||||||
|
|
||||||
try {
|
|
||||||
maker.compute();
|
|
||||||
|
|
||||||
//write reply
|
|
||||||
BlockOpResponseProto.newBuilder()
|
|
||||||
.setStatus(SUCCESS)
|
|
||||||
.setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
|
|
||||||
.setBytesPerCrc(maker.getBytesPerCRC())
|
|
||||||
.setCrcPerBlock(maker.getCrcPerBlock())
|
|
||||||
.setMd5(ByteString.copyFrom(maker.getOutBytes()))
|
|
||||||
.setCrcType(PBHelperClient.convert(maker.getCrcType())))
|
|
||||||
.build()
|
|
||||||
.writeDelimitedTo(out);
|
|
||||||
out.flush();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.info("blockChecksum " + stripedBlockInfo.getBlock() +
|
|
||||||
" received exception " + ioe);
|
|
||||||
incrDatanodeNetworkErrors();
|
|
||||||
throw ioe;
|
|
||||||
} finally {
|
|
||||||
IOUtils.closeStream(out);
|
|
||||||
}
|
|
||||||
|
|
||||||
//update metrics
|
|
||||||
datanode.metrics.addBlockChecksumOp(elapsed());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void copyBlock(final ExtendedBlock block,
|
public void copyBlock(final ExtendedBlock block,
|
||||||
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
final Token<BlockTokenIdentifier> blockToken) throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue