HDFS-9733. Refactor DFSClient#getFileChecksum and DataXceiver#blockChecksum. Contributed by Kai Zheng

This commit is contained in:
Uma Maheswara Rao G 2016-02-29 21:52:20 -08:00
parent 680f3fc02d
commit 307ec80aca
12 changed files with 845 additions and 323 deletions

View File

@ -261,8 +261,10 @@ public class IOUtils {
* @param stream the Stream to close * @param stream the Stream to close
*/ */
public static void closeStream(java.io.Closeable stream) { public static void closeStream(java.io.Closeable stream) {
if (stream != null) {
cleanup(null, stream); cleanup(null, stream);
} }
}
/** /**
* Closes the socket ignoring {@link IOException} * Closes the socket ignoring {@link IOException}

View File

@ -128,6 +128,17 @@ public class MD5Hash implements WritableComparable<MD5Hash> {
return new MD5Hash(digest); return new MD5Hash(digest);
} }
/** Construct a hash value for an array of byte array. */
public static MD5Hash digest(byte[][] dataArr, int start, int len) {
byte[] digest;
MessageDigest digester = getDigester();
for (byte[] data : dataArr) {
digester.update(data, start, len);
}
digest = digester.digest();
return new MD5Hash(digest);
}
/** Construct a hash value for a String. */ /** Construct a hash value for a String. */
public static MD5Hash digest(String string) { public static MD5Hash digest(String string) {
return digest(UTF8.getBytes(string)); return digest(UTF8.getBytes(string));

View File

@ -45,7 +45,7 @@ public class DataChecksum implements Checksum {
public static final int CHECKSUM_MIXED = 4; public static final int CHECKSUM_MIXED = 4;
/** The checksum types */ /** The checksum types */
public static enum Type { public enum Type {
NULL (CHECKSUM_NULL, 0), NULL (CHECKSUM_NULL, 0),
CRC32 (CHECKSUM_CRC32, 4), CRC32 (CHECKSUM_CRC32, 4),
CRC32C(CHECKSUM_CRC32C, 4), CRC32C(CHECKSUM_CRC32C, 4),
@ -55,7 +55,7 @@ public class DataChecksum implements Checksum {
public final int id; public final int id;
public final int size; public final int size;
private Type(int id, int size) { Type(int id, int size) {
this.id = id; this.id = id;
this.size = size; this.size = size;
} }
@ -230,17 +230,21 @@ public class DataChecksum implements Checksum {
public Type getChecksumType() { public Type getChecksumType() {
return type; return type;
} }
/** @return the size for a checksum. */ /** @return the size for a checksum. */
public int getChecksumSize() { public int getChecksumSize() {
return type.size; return type.size;
} }
/** @return the required checksum size given the data length. */ /** @return the required checksum size given the data length. */
public int getChecksumSize(int dataSize) { public int getChecksumSize(int dataSize) {
return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize(); return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize();
} }
public int getBytesPerChecksum() { public int getBytesPerChecksum() {
return bytesPerChecksum; return bytesPerChecksum;
} }
public int getNumBytesInSum() { public int getNumBytesInSum() {
return inSum; return inSum;
} }
@ -249,16 +253,19 @@ public class DataChecksum implements Checksum {
static public int getChecksumHeaderSize() { static public int getChecksumHeaderSize() {
return 1 + SIZE_OF_INTEGER; // type byte, bytesPerChecksum int return 1 + SIZE_OF_INTEGER; // type byte, bytesPerChecksum int
} }
//Checksum Interface. Just a wrapper around member summer. //Checksum Interface. Just a wrapper around member summer.
@Override @Override
public long getValue() { public long getValue() {
return summer.getValue(); return summer.getValue();
} }
@Override @Override
public void reset() { public void reset() {
summer.reset(); summer.reset();
inSum = 0; inSum = 0;
} }
@Override @Override
public void update( byte[] b, int off, int len ) { public void update( byte[] b, int off, int len ) {
if ( len > 0 ) { if ( len > 0 ) {
@ -266,6 +273,7 @@ public class DataChecksum implements Checksum {
inSum += len; inSum += len;
} }
} }
@Override @Override
public void update( int b ) { public void update( int b ) {
summer.update( b ); summer.update( b );

View File

@ -27,12 +27,9 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_LOCA
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -80,9 +77,7 @@ import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.ParentNotDirectoryException;
@ -138,7 +133,6 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
@ -146,20 +140,16 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactor
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.util.IOUtilsClient; import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -1293,7 +1283,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/** /**
* Invoke namenode append RPC. * Invoke namenode append RPC.
* It retries in case of {@link BlockNotYetCompleteException}. * It retries in case of some {@link RetriableException}.
*/ */
private LastBlockWithStatus callAppend(String src, private LastBlockWithStatus callAppend(String src,
EnumSetWritable<CreateFlag> flag) throws IOException { EnumSetWritable<CreateFlag> flag) throws IOException {
@ -1695,7 +1685,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
/** /**
* Get the checksum of the whole file of a range of the file. Note that the * Get the checksum of the whole file or a range of the file. Note that the
* range always starts from the beginning of the file. * range always starts from the beginning of the file.
* @param src The file path * @param src The file path
* @param length the length of the range, i.e., the range is [0, length] * @param length the length of the range, i.e., the range is [0, length]
@ -1706,9 +1696,23 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throws IOException { throws IOException {
checkOpen(); checkOpen();
Preconditions.checkArgument(length >= 0); Preconditions.checkArgument(length >= 0);
LocatedBlocks blockLocations = getBlockLocations(src, length);
FileChecksumHelper.FileChecksumComputer maker =
new FileChecksumHelper.ReplicatedFileChecksumComputer(src, length,
blockLocations, namenode, this);
maker.compute();
return maker.getFileChecksum();
}
protected LocatedBlocks getBlockLocations(String src,
long length) throws IOException {
//get block locations for the file range //get block locations for the file range
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, LocatedBlocks blockLocations = callGetBlockLocations(namenode,
length); src, 0, length);
if (null == blockLocations) { if (null == blockLocations) {
throw new FileNotFoundException("File does not exist: " + src); throw new FileNotFoundException("File does not exist: " + src);
} }
@ -1716,194 +1720,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throw new IOException("Fail to get checksum, since file " + src throw new IOException("Fail to get checksum, since file " + src
+ " is under construction."); + " is under construction.");
} }
List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
final DataOutputBuffer md5out = new DataOutputBuffer();
int bytesPerCRC = -1;
DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
long crcPerBlock = 0;
boolean refetchBlocks = false;
int lastRetriedIndex = -1;
// get block checksum for each block return blockLocations;
long remaining = length;
if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
remaining = Math.min(length, blockLocations.getFileLength());
}
for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {
if (refetchBlocks) { // refetch to get fresh tokens
blockLocations = callGetBlockLocations(namenode, src, 0, length);
if (null == blockLocations) {
throw new FileNotFoundException("File does not exist: " + src);
}
if (blockLocations.isUnderConstruction()) {
throw new IOException("Fail to get checksum, since file " + src
+ " is under construction.");
}
locatedblocks = blockLocations.getLocatedBlocks();
refetchBlocks = false;
}
LocatedBlock lb = locatedblocks.get(i);
final ExtendedBlock block = lb.getBlock();
if (remaining < block.getNumBytes()) {
block.setNumBytes(remaining);
}
remaining -= block.getNumBytes();
final DatanodeInfo[] datanodes = lb.getLocations();
//try each datanode location of the block
final int timeout = 3000 * datanodes.length +
dfsClientConf.getSocketTimeout();
boolean done = false;
for(int j = 0; !done && j < datanodes.length; j++) {
DataOutputStream out = null;
DataInputStream in = null;
try {
//connect to a datanode
IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
out = new DataOutputStream(new BufferedOutputStream(pair.out,
smallBufferSize));
in = new DataInputStream(pair.in);
LOG.debug("write to {}: {}, block={}",
datanodes[j], Op.BLOCK_CHECKSUM, block);
// get block MD5
new Sender(out).blockChecksum(block, lb.getBlockToken());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
String logInfo = "for block " + block + " from datanode " +
datanodes[j];
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
OpBlockChecksumResponseProto checksumData =
reply.getChecksumResponse();
//read byte-per-checksum
final int bpc = checksumData.getBytesPerCrc();
if (i == 0) { //first block
bytesPerCRC = bpc;
}
else if (bpc != bytesPerCRC) {
throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ " but bytesPerCRC=" + bytesPerCRC);
} }
//read crc-per-block protected IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
final long cpb = checksumData.getCrcPerBlock(); Token<BlockTokenIdentifier> blockToken)
if (locatedblocks.size() > 1 && i == 0) { throws IOException {
crcPerBlock = cpb; return DFSUtilClient.connectToDN(dn, timeout, conf, saslClient,
} socketFactory, getConf().isConnectToDnViaHostname(), this, blockToken);
//read md5
final MD5Hash md5 = new MD5Hash(
checksumData.getMd5().toByteArray());
md5.write(md5out);
// read crc-type
final DataChecksum.Type ct;
if (checksumData.hasCrcType()) {
ct = PBHelperClient.convert(checksumData
.getCrcType());
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte");
ct = inferChecksumTypeByReading(lb, datanodes[j]);
}
if (i == 0) { // first block
crcType = ct;
} else if (crcType != DataChecksum.Type.MIXED
&& crcType != ct) {
// if crc types are mixed in a file
crcType = DataChecksum.Type.MIXED;
}
done = true;
if (LOG.isDebugEnabled()) {
if (i == 0) {
LOG.debug("set bytesPerCRC=" + bytesPerCRC
+ ", crcPerBlock=" + crcPerBlock);
}
LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
}
} catch (InvalidBlockTokenException ibte) {
if (i > lastRetriedIndex) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ "for file {} for block {} from datanode {}. Will retry "
+ "the block once.",
src, block, datanodes[j]);
lastRetriedIndex = i;
done = true; // actually it's not done; but we'll retry
i--; // repeat at i-th block
refetchBlocks = true;
break;
}
} catch (IOException ie) {
LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
if (!done) {
throw new IOException("Fail to get block MD5 for " + block);
}
}
//compute file MD5
final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
switch (crcType) {
case CRC32:
return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
crcPerBlock, fileMD5);
case CRC32C:
return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
crcPerBlock, fileMD5);
default:
// If there is no block allocated for the file,
// return one with the magic entry that matches what previous
// hdfs versions return.
if (locatedblocks.size() == 0) {
return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
}
// we should never get here since the validity was checked
// when getCrcType() was called above.
return null;
}
}
/**
* Connect to the given datanode's datantrasfer port, and return
* the resulting IOStreamPair. This includes encryption wrapping, etc.
*/
private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
LocatedBlock lb) throws IOException {
boolean success = false;
Socket sock = null;
try {
sock = socketFactory.createSocket();
String dnAddr = dn.getXferAddr(getConf().isConnectToDnViaHostname());
LOG.debug("Connecting to datanode {}", dnAddr);
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setTcpNoDelay(dfsClientConf.getDataTransferTcpNoDelay());
sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock);
IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
lb.getBlockToken(), dn);
success = true;
return ret;
} finally {
if (!success) {
IOUtils.closeSocket(sock);
}
}
} }
/** /**
@ -1917,19 +1742,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @return the inferred checksum type * @return the inferred checksum type
* @throws IOException if an error occurs * @throws IOException if an error occurs
*/ */
private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn) protected Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
throws IOException { throws IOException {
IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(), lb); IOStreamPair pair = connectToDN(dn, dfsClientConf.getSocketTimeout(),
lb.getBlockToken());
try { try {
DataOutputStream out = new DataOutputStream( new Sender((DataOutputStream) pair.out).readBlock(lb.getBlock(),
new BufferedOutputStream(pair.out, smallBufferSize)); lb.getBlockToken(), clientName,
DataInputStream in = new DataInputStream(pair.in);
new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
0, 1, true, CachingStrategy.newDefaultStrategy()); 0, 1, true, CachingStrategy.newDefaultStrategy());
final BlockOpResponseProto reply = final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(pair.in));
String logInfo = "trying to read " + lb.getBlock() + " from datanode " + String logInfo = "trying to read " + lb.getBlock() + " from datanode " +
dn; dn;
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol; import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.util.IOUtilsClient; import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -56,8 +58,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -717,4 +724,46 @@ public class DFSUtilClient {
return corruptionMap; return corruptionMap;
} }
} }
/**
* Connect to the given datanode's datantrasfer port, and return
* the resulting IOStreamPair. This includes encryption wrapping, etc.
*/
public static IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
Configuration conf,
SaslDataTransferClient saslClient,
SocketFactory socketFactory,
boolean connectToDnViaHostname,
DataEncryptionKeyFactory dekFactory,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
boolean success = false;
Socket sock = null;
try {
sock = socketFactory.createSocket();
String dnAddr = dn.getXferAddr(connectToDnViaHostname);
LOG.debug("Connecting to datanode {}", dnAddr);
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock);
IOStreamPair pair = saslClient.newSocketSend(sock, unbufOut,
unbufIn, dekFactory, blockToken, dn);
IOStreamPair result = new IOStreamPair(
new DataInputStream(pair.in),
new DataOutputStream(new BufferedOutputStream(pair.out,
DFSUtilClient.getSmallBufferSize(conf)))
);
success = true;
return result;
} finally {
if (!success) {
IOUtils.closeSocket(sock);
}
}
}
} }

View File

@ -0,0 +1,416 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
/**
* Utility classes to compute file checksum for both replicated and striped
* files.
*/
final class FileChecksumHelper {
static final Logger LOG =
LoggerFactory.getLogger(FileChecksumHelper.class);
private FileChecksumHelper() {}
/**
* A common abstract class to compute file checksum.
*/
static abstract class FileChecksumComputer {
private final String src;
private final long length;
private final DFSClient client;
private final ClientProtocol namenode;
private final DataOutputBuffer md5out = new DataOutputBuffer();
private MD5MD5CRC32FileChecksum fileChecksum;
private LocatedBlocks blockLocations;
private int timeout;
private List<LocatedBlock> locatedBlocks;
private long remaining = 0L;
private int bytesPerCRC = -1;
private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
private long crcPerBlock = 0;
private boolean refetchBlocks = false;
private int lastRetriedIndex = -1;
/**
* Constructor that accepts all the input parameters for the computing.
*/
FileChecksumComputer(String src, long length,
LocatedBlocks blockLocations,
ClientProtocol namenode,
DFSClient client) throws IOException {
this.src = src;
this.length = length;
this.blockLocations = blockLocations;
this.namenode = namenode;
this.client = client;
this.remaining = length;
if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {
this.remaining = Math.min(length, blockLocations.getFileLength());
}
this.locatedBlocks = blockLocations.getLocatedBlocks();
}
String getSrc() {
return src;
}
long getLength() {
return length;
}
DFSClient getClient() {
return client;
}
ClientProtocol getNamenode() {
return namenode;
}
DataOutputBuffer getMd5out() {
return md5out;
}
MD5MD5CRC32FileChecksum getFileChecksum() {
return fileChecksum;
}
LocatedBlocks getBlockLocations() {
return blockLocations;
}
void setBlockLocations(LocatedBlocks blockLocations) {
this.blockLocations = blockLocations;
}
int getTimeout() {
return timeout;
}
void setTimeout(int timeout) {
this.timeout = timeout;
}
List<LocatedBlock> getLocatedBlocks() {
return locatedBlocks;
}
void setLocatedBlocks(List<LocatedBlock> locatedBlocks) {
this.locatedBlocks = locatedBlocks;
}
long getRemaining() {
return remaining;
}
void setRemaining(long remaining) {
this.remaining = remaining;
}
int getBytesPerCRC() {
return bytesPerCRC;
}
void setBytesPerCRC(int bytesPerCRC) {
this.bytesPerCRC = bytesPerCRC;
}
DataChecksum.Type getCrcType() {
return crcType;
}
void setCrcType(DataChecksum.Type crcType) {
this.crcType = crcType;
}
long getCrcPerBlock() {
return crcPerBlock;
}
void setCrcPerBlock(long crcPerBlock) {
this.crcPerBlock = crcPerBlock;
}
boolean isRefetchBlocks() {
return refetchBlocks;
}
void setRefetchBlocks(boolean refetchBlocks) {
this.refetchBlocks = refetchBlocks;
}
int getLastRetriedIndex() {
return lastRetriedIndex;
}
void setLastRetriedIndex(int lastRetriedIndex) {
this.lastRetriedIndex = lastRetriedIndex;
}
/**
* Perform the file checksum computing. The intermediate results are stored
* in the object and will be used later.
* @throws IOException
*/
void compute() throws IOException {
checksumBlocks();
fileChecksum = makeFinalResult();
}
/**
* Compute and aggregate block checksums block by block.
* @throws IOException
*/
abstract void checksumBlocks() throws IOException;
/**
* Make final file checksum result given the computing process done.
*/
MD5MD5CRC32FileChecksum makeFinalResult() {
//compute file MD5
final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
switch (crcType) {
case CRC32:
return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
crcPerBlock, fileMD5);
case CRC32C:
return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
crcPerBlock, fileMD5);
default:
// If there is no block allocated for the file,
// return one with the magic entry that matches what previous
// hdfs versions return.
if (locatedBlocks.isEmpty()) {
return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);
}
// we should never get here since the validity was checked
// when getCrcType() was called above.
return null;
}
}
/**
* Create and return a sender given an IO stream pair.
*/
Sender createSender(IOStreamPair pair) {
DataOutputStream out = (DataOutputStream) pair.out;
return new Sender(out);
}
/**
* Close an IO stream pair.
*/
void close(IOStreamPair pair) {
if (pair != null) {
IOUtils.closeStream(pair.in);
IOUtils.closeStream(pair.out);
}
}
}
/**
* Replicated file checksum computer.
*/
static class ReplicatedFileChecksumComputer extends FileChecksumComputer {
private int blockIdx;
ReplicatedFileChecksumComputer(String src, long length,
LocatedBlocks blockLocations,
ClientProtocol namenode,
DFSClient client) throws IOException {
super(src, length, blockLocations, namenode, client);
}
@Override
void checksumBlocks() throws IOException {
// get block checksum for each block
for (blockIdx = 0;
blockIdx < getLocatedBlocks().size() && getRemaining() >= 0;
blockIdx++) {
if (isRefetchBlocks()) { // refetch to get fresh tokens
setBlockLocations(getClient().getBlockLocations(getSrc(),
getLength()));
setLocatedBlocks(getBlockLocations().getLocatedBlocks());
setRefetchBlocks(false);
}
LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx);
if (!checksumBlock(locatedBlock)) {
throw new IOException("Fail to get block MD5 for " + locatedBlock);
}
}
}
/**
* Return true when sounds good to continue or retry, false when severe
* condition or totally failed.
*/
private boolean checksumBlock(
LocatedBlock locatedBlock) throws IOException {
ExtendedBlock block = locatedBlock.getBlock();
if (getRemaining() < block.getNumBytes()) {
block.setNumBytes(getRemaining());
}
setRemaining(getRemaining() - block.getNumBytes());
DatanodeInfo[] datanodes = locatedBlock.getLocations();
int tmpTimeout = 3000 * datanodes.length +
getClient().getConf().getSocketTimeout();
setTimeout(tmpTimeout);
//try each datanode location of the block
boolean done = false;
for (int j = 0; !done && j < datanodes.length; j++) {
try {
tryDatanode(locatedBlock, datanodes[j]);
done = true;
} catch (InvalidBlockTokenException ibte) {
if (blockIdx > getLastRetriedIndex()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ "for file {} for block {} from datanode {}. Will retry "
+ "the block once.",
getSrc(), block, datanodes[j]);
setLastRetriedIndex(blockIdx);
done = true; // actually it's not done; but we'll retry
blockIdx--; // repeat at blockIdx-th block
setRefetchBlocks(true);
}
} catch (IOException ie) {
LOG.warn("src={}" + ", datanodes[{}]={}",
getSrc(), j, datanodes[j], ie);
}
}
return done;
}
/**
* Try one replica or datanode to compute the block checksum given a block.
*/
private void tryDatanode(LocatedBlock locatedBlock,
DatanodeInfo datanode) throws IOException {
ExtendedBlock block = locatedBlock.getBlock();
try (IOStreamPair pair = getClient().connectToDN(datanode, getTimeout(),
locatedBlock.getBlockToken())) {
LOG.debug("write to {}: {}, block={}", datanode,
Op.BLOCK_CHECKSUM, block);
// get block MD5
createSender(pair).blockChecksum(block,
locatedBlock.getBlockToken());
final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(pair.in));
String logInfo = "for block " + block + " from datanode " +
datanode;
DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo);
OpBlockChecksumResponseProto checksumData =
reply.getChecksumResponse();
//read byte-per-checksum
final int bpc = checksumData.getBytesPerCrc();
if (blockIdx == 0) { //first block
setBytesPerCRC(bpc);
} else if (bpc != getBytesPerCRC()) {
throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
+ " but bytesPerCRC=" + getBytesPerCRC());
}
//read crc-per-block
final long cpb = checksumData.getCrcPerBlock();
if (getLocatedBlocks().size() > 1 && blockIdx == 0) {
setCrcPerBlock(cpb);
}
//read md5
final MD5Hash md5 = new MD5Hash(
checksumData.getMd5().toByteArray());
md5.write(getMd5out());
// read crc-type
final DataChecksum.Type ct;
if (checksumData.hasCrcType()) {
ct = PBHelperClient.convert(checksumData
.getCrcType());
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte");
ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode);
}
if (blockIdx == 0) { // first block
setCrcType(ct);
} else if (getCrcType() != DataChecksum.Type.MIXED
&& getCrcType() != ct) {
// if crc types are mixed in a file
setCrcType(DataChecksum.Type.MIXED);
}
if (LOG.isDebugEnabled()) {
if (blockIdx == 0) {
LOG.debug("set bytesPerCRC=" + getBytesPerCRC()
+ ", crcPerBlock=" + getCrcPerBlock());
}
LOG.debug("got reply from " + datanode + ": md5=" + md5);
}
}
}
}
}

View File

@ -17,16 +17,19 @@
*/ */
package org.apache.hadoop.hdfs.protocol.datatransfer; package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.IOUtils;
/** /**
* A little struct class to wrap an InputStream and an OutputStream. * A little struct class to wrap an InputStream and an OutputStream.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class IOStreamPair { public class IOStreamPair implements Closeable {
public final InputStream in; public final InputStream in;
public final OutputStream out; public final OutputStream out;
@ -34,4 +37,10 @@ public class IOStreamPair {
this.in = in; this.in = in;
this.out = out; this.out = out;
} }
@Override
public void close() throws IOException {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
} }

View File

@ -233,6 +233,9 @@ Trunk (Unreleased)
HDFS-9838. Refactor the excessReplicateMap to a class. (szetszwo) HDFS-9838. Refactor the excessReplicateMap to a class. (szetszwo)
HDFS-9733. Refactor DFSClient#getFileChecksum and DataXceiver#blockChecksum
(Kai Zheng via umamahesh)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -0,0 +1,254 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.MessageDigest;
/**
* Utilities for Block checksum computing, for both replicated and striped
* blocks.
*/
final class BlockChecksumHelper {
static final Logger LOG = LoggerFactory.getLogger(BlockChecksumHelper.class);
private BlockChecksumHelper() {}
/**
* The abstract base block checksum computer.
*/
static abstract class BlockChecksumComputer {
private final DataNode datanode;
private final ExtendedBlock block;
// client side now can specify a range of the block for checksum
private final long requestLength;
private final LengthInputStream metadataIn;
private final DataInputStream checksumIn;
private final long visibleLength;
private final boolean partialBlk;
private byte[] outBytes;
private int bytesPerCRC = -1;
private DataChecksum.Type crcType = null;
private long crcPerBlock = -1;
private int checksumSize = -1;
private BlockMetadataHeader header;
private DataChecksum checksum;
BlockChecksumComputer(DataNode datanode,
ExtendedBlock block) throws IOException {
this.datanode = datanode;
this.block = block;
this.requestLength = block.getNumBytes();
Preconditions.checkArgument(requestLength >= 0);
this.metadataIn = datanode.data.getMetaDataInputStream(block);
this.visibleLength = datanode.data.getReplicaVisibleLength(block);
this.partialBlk = requestLength < visibleLength;
int ioFileBufferSize =
DFSUtilClient.getIoFileBufferSize(datanode.getConf());
this.checksumIn = new DataInputStream(
new BufferedInputStream(metadataIn, ioFileBufferSize));
}
protected DataNode getDatanode() {
return datanode;
}
protected ExtendedBlock getBlock() {
return block;
}
protected long getRequestLength() {
return requestLength;
}
protected LengthInputStream getMetadataIn() {
return metadataIn;
}
protected DataInputStream getChecksumIn() {
return checksumIn;
}
protected long getVisibleLength() {
return visibleLength;
}
protected boolean isPartialBlk() {
return partialBlk;
}
protected void setOutBytes(byte[] bytes) {
this.outBytes = bytes;
}
protected byte[] getOutBytes() {
return outBytes;
}
protected int getBytesPerCRC() {
return bytesPerCRC;
}
protected DataChecksum.Type getCrcType() {
return crcType;
}
protected long getCrcPerBlock() {
return crcPerBlock;
}
protected int getChecksumSize() {
return checksumSize;
}
protected BlockMetadataHeader getHeader() {
return header;
}
protected DataChecksum getChecksum() {
return checksum;
}
/**
* Perform the block checksum computing.
* @throws IOException
*/
abstract void compute() throws IOException;
/**
* Read block metadata header.
* @throws IOException
*/
protected void readHeader() throws IOException {
//read metadata file
header = BlockMetadataHeader.readHeader(checksumIn);
checksum = header.getChecksum();
checksumSize = checksum.getChecksumSize();
bytesPerCRC = checksum.getBytesPerChecksum();
crcPerBlock = checksumSize <= 0 ? 0 :
(metadataIn.getLength() -
BlockMetadataHeader.getHeaderSize()) / checksumSize;
crcType = checksum.getChecksumType();
}
/**
* Calculate partial block checksum.
* @return
* @throws IOException
*/
protected byte[] crcPartialBlock() throws IOException {
int partialLength = (int) (requestLength % bytesPerCRC);
if (partialLength > 0) {
byte[] buf = new byte[partialLength];
final InputStream blockIn = datanode.data.getBlockInputStream(block,
requestLength - partialLength);
try {
// Get the CRC of the partialLength.
IOUtils.readFully(blockIn, buf, 0, partialLength);
} finally {
IOUtils.closeStream(blockIn);
}
checksum.update(buf, 0, partialLength);
byte[] partialCrc = new byte[checksumSize];
checksum.writeValue(partialCrc, 0, true);
return partialCrc;
}
return null;
}
}
/**
* Replicated block checksum computer.
*/
static class ReplicatedBlockChecksumComputer extends BlockChecksumComputer {
ReplicatedBlockChecksumComputer(DataNode datanode,
ExtendedBlock block) throws IOException {
super(datanode, block);
}
@Override
void compute() throws IOException {
try {
readHeader();
MD5Hash md5out;
if (isPartialBlk() && getCrcPerBlock() > 0) {
md5out = checksumPartialBlock();
} else {
md5out = checksumWholeBlock();
}
setOutBytes(md5out.getDigest());
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + getBlock() + ", bytesPerCRC=" + getBytesPerCRC()
+ ", crcPerBlock=" + getCrcPerBlock() + ", md5out=" + md5out);
}
} finally {
IOUtils.closeStream(getChecksumIn());
IOUtils.closeStream(getMetadataIn());
}
}
private MD5Hash checksumWholeBlock() throws IOException {
MD5Hash md5out = MD5Hash.digest(getChecksumIn());
return md5out;
}
private MD5Hash checksumPartialBlock() throws IOException {
byte[] buffer = new byte[4*1024];
MessageDigest digester = MD5Hash.getDigester();
long remaining = (getRequestLength() / getBytesPerCRC())
* getChecksumSize();
for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
toDigest = getChecksumIn().read(buffer, 0,
(int) Math.min(remaining, buffer.length));
if (toDigest < 0) {
break;
}
digester.update(buffer, 0, toDigest);
}
byte[] partialCrc = crcPartialBlock();
if (partialCrc != null) {
digester.update(partialCrc);
}
return new MD5Hash(digester.digest());
}
}
}

View File

@ -47,7 +47,6 @@ import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.core.Sampler;
import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.TraceScope;
import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED; import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;

View File

@ -3170,6 +3170,16 @@ public class DataNode extends ReconfigurableBase
return ecWorker; return ecWorker;
} }
IOStreamPair connectToDN(DatanodeInfo datanodeID, int timeout,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
return DFSUtilClient.connectToDN(datanodeID, timeout, conf, saslClient,
NetUtils.getDefaultSocketFactory(getConf()), false,
getDataEncryptionKeyFactoryForBlock(block), blockToken);
}
/** /**
* Get timeout value of each OOB type from configuration * Get timeout value of each OOB type from configuration
*/ */

View File

@ -17,37 +17,9 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
@ -73,26 +45,52 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmR
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.BlockChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.BlockChecksumHelper.ReplicatedBlockChecksumComputer;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.StopWatch;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import static org.apache.hadoop.util.Time.monotonicNow;
/** /**
* Thread for processing incoming/outgoing data stream. * Thread for processing incoming/outgoing data stream.
@ -886,88 +884,30 @@ class DataXceiver extends Receiver implements Runnable {
} }
} }
private MD5Hash calcPartialBlockChecksum(ExtendedBlock block,
long requestLength, DataChecksum checksum, DataInputStream checksumIn)
throws IOException {
final int bytesPerCRC = checksum.getBytesPerChecksum();
final int csize = checksum.getChecksumSize();
final byte[] buffer = new byte[4*1024];
MessageDigest digester = MD5Hash.getDigester();
long remaining = requestLength / bytesPerCRC * csize;
for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
toDigest = checksumIn.read(buffer, 0,
(int) Math.min(remaining, buffer.length));
if (toDigest < 0) {
break;
}
digester.update(buffer, 0, toDigest);
}
int partialLength = (int) (requestLength % bytesPerCRC);
if (partialLength > 0) {
byte[] buf = new byte[partialLength];
final InputStream blockIn = datanode.data.getBlockInputStream(block,
requestLength - partialLength);
try {
// Get the CRC of the partialLength.
IOUtils.readFully(blockIn, buf, 0, partialLength);
} finally {
IOUtils.closeStream(blockIn);
}
checksum.update(buf, 0, partialLength);
byte[] partialCrc = new byte[csize];
checksum.writeValue(partialCrc, 0, true);
digester.update(partialCrc);
}
return new MD5Hash(digester.digest());
}
@Override @Override
public void blockChecksum(final ExtendedBlock block, public void blockChecksum(ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException { Token<BlockTokenIdentifier> blockToken)
throws IOException {
updateCurrentThreadName("Getting checksum for block " + block); updateCurrentThreadName("Getting checksum for block " + block);
final DataOutputStream out = new DataOutputStream( final DataOutputStream out = new DataOutputStream(
getOutputStream()); getOutputStream());
checkAccess(out, true, block, blockToken, checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ); Op.BLOCK_CHECKSUM, BlockTokenIdentifier.AccessMode.READ);
// client side now can specify a range of the block for checksum
long requestLength = block.getNumBytes();
Preconditions.checkArgument(requestLength >= 0);
long visibleLength = datanode.data.getReplicaVisibleLength(block);
boolean partialBlk = requestLength < visibleLength;
final LengthInputStream metadataIn = datanode.data BlockChecksumComputer maker =
.getMetaDataInputStream(block); new ReplicatedBlockChecksumComputer(datanode, block);
final DataInputStream checksumIn = new DataInputStream(
new BufferedInputStream(metadataIn, ioFileBufferSize));
try { try {
//read metadata file maker.compute();
final BlockMetadataHeader header = BlockMetadataHeader
.readHeader(checksumIn);
final DataChecksum checksum = header.getChecksum();
final int csize = checksum.getChecksumSize();
final int bytesPerCRC = checksum.getBytesPerChecksum();
final long crcPerBlock = csize <= 0 ? 0 :
(metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
final MD5Hash md5 = partialBlk && crcPerBlock > 0 ?
calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
: MD5Hash.digest(checksumIn);
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
+ ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
}
//write reply //write reply
BlockOpResponseProto.newBuilder() BlockOpResponseProto.newBuilder()
.setStatus(SUCCESS) .setStatus(SUCCESS)
.setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
.setBytesPerCrc(bytesPerCRC) .setBytesPerCrc(maker.getBytesPerCRC())
.setCrcPerBlock(crcPerBlock) .setCrcPerBlock(maker.getCrcPerBlock())
.setMd5(ByteString.copyFrom(md5.getDigest())) .setMd5(ByteString.copyFrom(maker.getOutBytes()))
.setCrcType(PBHelperClient.convert(checksum.getChecksumType()))) .setCrcType(PBHelperClient.convert(maker.getCrcType())))
.build() .build()
.writeDelimitedTo(out); .writeDelimitedTo(out);
out.flush(); out.flush();
@ -977,8 +917,6 @@ class DataXceiver extends Receiver implements Runnable {
throw ioe; throw ioe;
} finally { } finally {
IOUtils.closeStream(out); IOUtils.closeStream(out);
IOUtils.closeStream(checksumIn);
IOUtils.closeStream(metadataIn);
} }
//update metrics //update metrics
@ -1276,7 +1214,7 @@ class DataXceiver extends Receiver implements Runnable {
/** /**
* Wait until the BP is registered, upto the configured amount of time. * Wait until the BP is registered, upto the configured amount of time.
* Throws an exception if times out, which should fail the client request. * Throws an exception if times out, which should fail the client request.
* @param the requested block * @param block requested block
*/ */
void checkAndWaitForBP(final ExtendedBlock block) void checkAndWaitForBP(final ExtendedBlock block)
throws IOException { throws IOException {