HDFS-4403. DFSClient can infer checksum type when not provided by reading first byte. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1436731 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-01-22 02:59:32 +00:00
parent 361e05864f
commit ddeb98f4fd
4 changed files with 131 additions and 51 deletions

View File

@ -205,6 +205,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4393. Make empty request and responses in protocol translators can be
static final members. (Brandon Li via suresh)
HDFS-4403. DFSClient can infer checksum type when not provided by reading
first byte (todd)
OPTIMIZATIONS
HDFS-3429. DataNode reads checksums even if client does not need them (todd)

View File

@ -152,6 +152,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
@ -1589,7 +1590,7 @@ public class DFSClient implements java.io.Closeable {
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
return getFileChecksum(src, namenode, socketFactory,
return getFileChecksum(src, clientName, namenode, socketFactory,
dfsClientConf.socketTimeout, getDataEncryptionKey(),
dfsClientConf.connectToDnViaHostname);
}
@ -1632,9 +1633,16 @@ public class DFSClient implements java.io.Closeable {
/**
* Get the checksum of a file.
* @param src The file path
* @param clientName the name of the client requesting the checksum.
* @param namenode the RPC proxy for the namenode
* @param socketFactory to create sockets to connect to DNs
* @param socketTimeout timeout to use when connecting and waiting for a response
* @param encryptionKey the key needed to communicate with DNs in this cluster
* @param connectToDnViaHostname {@see #connectToDnViaHostname()}
* @return The checksum
*/
public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
static MD5MD5CRC32FileChecksum getFileChecksum(String src,
String clientName,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
throws IOException {
@ -1669,32 +1677,16 @@ public class DFSClient implements java.io.Closeable {
final int timeout = 3000 * datanodes.length + socketTimeout;
boolean done = false;
for(int j = 0; !done && j < datanodes.length; j++) {
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
try {
//connect to a datanode
sock = socketFactory.createSocket();
String dnAddr = datanodes[j].getXferAddr(connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock);
if (encryptionKey != null) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
unbufOut, unbufIn, encryptionKey);
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
encryptionKey, datanodes[j], timeout);
out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
in = new DataInputStream(pair.in);
if (LOG.isDebugEnabled()) {
LOG.debug("write to " + datanodes[j] + ": "
@ -1707,19 +1699,8 @@ public class DFSClient implements java.io.Closeable {
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (reply.getStatus() != Status.SUCCESS) {
if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN
&& i > lastRetriedIndex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ "for file " + src + " for block " + block
+ " from datanode " + datanodes[j]
+ ". Will retry the block once.");
}
lastRetriedIndex = i;
done = true; // actually it's not done; but we'll retry
i--; // repeat at i-th block
refetchBlocks = true;
break;
if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException();
} else {
throw new IOException("Bad response " + reply + " for block "
+ block + " from datanode " + datanodes[j]);
@ -1751,8 +1732,18 @@ public class DFSClient implements java.io.Closeable {
md5.write(md5out);
// read crc-type
final DataChecksum.Type ct = PBHelper.convert(checksumData
.getCrcType());
final DataChecksum.Type ct;
if (checksumData.hasCrcType()) {
ct = PBHelper.convert(checksumData
.getCrcType());
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte");
ct = inferChecksumTypeByReading(
clientName, socketFactory, socketTimeout, lb, datanodes[j],
encryptionKey, connectToDnViaHostname);
}
if (i == 0) { // first block
crcType = ct;
} else if (crcType != DataChecksum.Type.MIXED
@ -1770,12 +1761,25 @@ public class DFSClient implements java.io.Closeable {
}
LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
}
} catch (InvalidBlockTokenException ibte) {
if (i > lastRetriedIndex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ "for file " + src + " for block " + block
+ " from datanode " + datanodes[j]
+ ". Will retry the block once.");
}
lastRetriedIndex = i;
done = true; // actually it's not done; but we'll retry
i--; // repeat at i-th block
refetchBlocks = true;
break;
}
} catch (IOException ie) {
LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
IOUtils.closeSocket(sock);
}
}
@ -1807,6 +1811,90 @@ public class DFSClient implements java.io.Closeable {
}
}
/**
* Connect to the given datanode's datantrasfer port, and return
* the resulting IOStreamPair. This includes encryption wrapping, etc.
*/
private static IOStreamPair connectToDN(
SocketFactory socketFactory, boolean connectToDnViaHostname,
DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout)
throws IOException
{
boolean success = false;
Socket sock = null;
try {
sock = socketFactory.createSocket();
String dnAddr = dn.getXferAddr(connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock);
IOStreamPair ret;
if (encryptionKey != null) {
ret = DataTransferEncryptor.getEncryptedStreams(
unbufOut, unbufIn, encryptionKey);
} else {
ret = new IOStreamPair(unbufIn, unbufOut);
}
success = true;
return ret;
} finally {
if (!success) {
IOUtils.closeSocket(sock);
}
}
}
/**
* Infer the checksum type for a replica by sending an OP_READ_BLOCK
* for the first byte of that replica. This is used for compatibility
* with older HDFS versions which did not include the checksum type in
* OpBlockChecksumResponseProto.
*
* @param in input stream from datanode
* @param out output stream to datanode
* @param lb the located block
* @param clientName the name of the DFSClient requesting the checksum
* @param dn the connected datanode
* @return the inferred checksum type
* @throws IOException if an error occurs
*/
private static Type inferChecksumTypeByReading(
String clientName, SocketFactory socketFactory, int socketTimeout,
LocatedBlock lb, DatanodeInfo dn,
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
throws IOException {
IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
encryptionKey, dn, socketTimeout);
try {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsConstants.SMALL_BUFFER_SIZE));
DataInputStream in = new DataInputStream(pair.in);
new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true);
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (reply.getStatus() != Status.SUCCESS) {
if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException();
} else {
throw new IOException("Bad response " + reply + " trying to read "
+ lb.getBlock() + " from datanode " + dn);
}
}
return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
} finally {
IOUtils.cleanup(null, pair.in, pair.out);
}
}
/**
* Set permissions to a file or directory.
* @param src path name.

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.net.URL;
import javax.net.SocketFactory;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@ -33,14 +32,11 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil;
import org.znerd.xmlenc.XMLOutputter;
@ -116,18 +112,11 @@ public class FileChecksumServlets {
final DataNode datanode = (DataNode) context.getAttribute("datanode");
final Configuration conf =
new HdfsConfiguration(datanode.getConf());
final int socketTimeout = conf.getInt(
DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
final SocketFactory socketFactory = NetUtils.getSocketFactory(conf,
ClientProtocol.class);
try {
final DFSClient dfs = DatanodeJspHelper.getDFSClient(request,
datanode, conf, getUGI(request, conf));
final ClientProtocol nnproxy = dfs.getNamenode();
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey(), false);
final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path);
MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) {
writeXml(ioe, path, xml);

View File

@ -183,5 +183,5 @@ message OpBlockChecksumResponseProto {
required uint32 bytesPerCrc = 1;
required uint64 crcPerBlock = 2;
required bytes md5 = 3;
optional ChecksumTypeProto crcType = 4 [default = CHECKSUM_CRC32];
optional ChecksumTypeProto crcType = 4;
}