From cfae13306ac0fb3f3c139d5ac511bf78cede1b77 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Tue, 22 Jan 2013 02:59:25 +0000 Subject: [PATCH] HDFS-4403. DFSClient can infer checksum type when not provided by reading first byte. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1436730 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSClient.java | 164 ++++++++++++++---- .../server/namenode/FileChecksumServlets.java | 13 +- .../src/main/proto/datatransfer.proto | 2 +- 4 files changed, 131 insertions(+), 51 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6bac5af84d7..7467cde47a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -494,6 +494,9 @@ Release 2.0.3-alpha - Unreleased HDFS-4393. Make empty request and responses in protocol translators can be static final members. (Brandon Li via suresh) + HDFS-4403. DFSClient can infer checksum type when not provided by reading + first byte (todd) + OPTIMIZATIONS HDFS-3429. DataNode reads checksums even if client does not need them (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 9fa98fdd077..ca1a13aad6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -150,6 +150,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; @@ -1562,7 +1563,7 @@ public class DFSClient implements java.io.Closeable { */ public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { checkOpen(); - return getFileChecksum(src, namenode, socketFactory, + return getFileChecksum(src, clientName, namenode, socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(), dfsClientConf.connectToDnViaHostname); } @@ -1605,9 +1606,16 @@ public class DFSClient implements java.io.Closeable { /** * Get the checksum of a file. * @param src The file path + * @param clientName the name of the client requesting the checksum. + * @param namenode the RPC proxy for the namenode + * @param socketFactory to create sockets to connect to DNs + * @param socketTimeout timeout to use when connecting and waiting for a response + * @param encryptionKey the key needed to communicate with DNs in this cluster + * @param connectToDnViaHostname {@see #connectToDnViaHostname()} * @return The checksum */ - public static MD5MD5CRC32FileChecksum getFileChecksum(String src, + static MD5MD5CRC32FileChecksum getFileChecksum(String src, + String clientName, ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) throws IOException { @@ -1642,32 +1650,16 @@ public class DFSClient implements java.io.Closeable { final int timeout = 3000 * datanodes.length + socketTimeout; boolean done = false; for(int j = 0; !done && j < datanodes.length; j++) { - Socket sock = null; DataOutputStream out = null; DataInputStream in = null; try { //connect to a datanode - sock = socketFactory.createSocket(); - String dnAddr = datanodes[j].getXferAddr(connectToDnViaHostname); - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to datanode " + dnAddr); - } - NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); - sock.setSoTimeout(timeout); - - OutputStream unbufOut = NetUtils.getOutputStream(sock); - InputStream unbufIn = NetUtils.getInputStream(sock); - if (encryptionKey != null) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, encryptionKey); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } - out = new DataOutputStream(new BufferedOutputStream(unbufOut, + IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, + encryptionKey, datanodes[j], timeout); + out = new DataOutputStream(new BufferedOutputStream(pair.out, HdfsConstants.SMALL_BUFFER_SIZE)); - in = new DataInputStream(unbufIn); + in = new DataInputStream(pair.in); if (LOG.isDebugEnabled()) { LOG.debug("write to " + datanodes[j] + ": " @@ -1680,19 +1672,8 @@ public class DFSClient implements java.io.Closeable { BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); if (reply.getStatus() != Status.SUCCESS) { - if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN - && i > lastRetriedIndex) { - if (LOG.isDebugEnabled()) { - LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " - + "for file " + src + " for block " + block - + " from datanode " + datanodes[j] - + ". Will retry the block once."); - } - lastRetriedIndex = i; - done = true; // actually it's not done; but we'll retry - i--; // repeat at i-th block - refetchBlocks = true; - break; + if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) { + throw new InvalidBlockTokenException(); } else { throw new IOException("Bad response " + reply + " for block " + block + " from datanode " + datanodes[j]); @@ -1724,8 +1705,18 @@ public class DFSClient implements java.io.Closeable { md5.write(md5out); // read crc-type - final DataChecksum.Type ct = PBHelper.convert(checksumData - .getCrcType()); + final DataChecksum.Type ct; + if (checksumData.hasCrcType()) { + ct = PBHelper.convert(checksumData + .getCrcType()); + } else { + LOG.debug("Retrieving checksum from an earlier-version DataNode: " + + "inferring checksum by reading first byte"); + ct = inferChecksumTypeByReading( + clientName, socketFactory, socketTimeout, lb, datanodes[j], + encryptionKey, connectToDnViaHostname); + } + if (i == 0) { // first block crcType = ct; } else if (crcType != DataChecksum.Type.MIXED @@ -1743,12 +1734,25 @@ public class DFSClient implements java.io.Closeable { } LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5); } + } catch (InvalidBlockTokenException ibte) { + if (i > lastRetriedIndex) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " + + "for file " + src + " for block " + block + + " from datanode " + datanodes[j] + + ". Will retry the block once."); + } + lastRetriedIndex = i; + done = true; // actually it's not done; but we'll retry + i--; // repeat at i-th block + refetchBlocks = true; + break; + } } catch (IOException ie) { LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); - IOUtils.closeSocket(sock); } } @@ -1780,6 +1784,90 @@ public class DFSClient implements java.io.Closeable { } } + /** + * Connect to the given datanode's datantrasfer port, and return + * the resulting IOStreamPair. This includes encryption wrapping, etc. + */ + private static IOStreamPair connectToDN( + SocketFactory socketFactory, boolean connectToDnViaHostname, + DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout) + throws IOException + { + boolean success = false; + Socket sock = null; + try { + sock = socketFactory.createSocket(); + String dnAddr = dn.getXferAddr(connectToDnViaHostname); + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to datanode " + dnAddr); + } + NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout); + sock.setSoTimeout(timeout); + + OutputStream unbufOut = NetUtils.getOutputStream(sock); + InputStream unbufIn = NetUtils.getInputStream(sock); + IOStreamPair ret; + if (encryptionKey != null) { + ret = DataTransferEncryptor.getEncryptedStreams( + unbufOut, unbufIn, encryptionKey); + } else { + ret = new IOStreamPair(unbufIn, unbufOut); + } + success = true; + return ret; + } finally { + if (!success) { + IOUtils.closeSocket(sock); + } + } + } + + /** + * Infer the checksum type for a replica by sending an OP_READ_BLOCK + * for the first byte of that replica. This is used for compatibility + * with older HDFS versions which did not include the checksum type in + * OpBlockChecksumResponseProto. + * + * @param in input stream from datanode + * @param out output stream to datanode + * @param lb the located block + * @param clientName the name of the DFSClient requesting the checksum + * @param dn the connected datanode + * @return the inferred checksum type + * @throws IOException if an error occurs + */ + private static Type inferChecksumTypeByReading( + String clientName, SocketFactory socketFactory, int socketTimeout, + LocatedBlock lb, DatanodeInfo dn, + DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) + throws IOException { + IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname, + encryptionKey, dn, socketTimeout); + + try { + DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out, + HdfsConstants.SMALL_BUFFER_SIZE)); + DataInputStream in = new DataInputStream(pair.in); + + new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true); + final BlockOpResponseProto reply = + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); + + if (reply.getStatus() != Status.SUCCESS) { + if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) { + throw new InvalidBlockTokenException(); + } else { + throw new IOException("Bad response " + reply + " trying to read " + + lb.getBlock() + " from datanode " + dn); + } + } + + return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType()); + } finally { + IOUtils.cleanup(null, pair.in, pair.out); + } + } + /** * Set permissions to a file or directory. * @param src path name. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java index 40286ec8ed6..5c9d164e2ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileChecksumServlets.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.PrintWriter; import java.net.URL; -import javax.net.SocketFactory; import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -33,14 +32,11 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ServletUtil; import org.znerd.xmlenc.XMLOutputter; @@ -116,18 +112,11 @@ public class FileChecksumServlets { final DataNode datanode = (DataNode) context.getAttribute("datanode"); final Configuration conf = new HdfsConfiguration(datanode.getConf()); - final int socketTimeout = conf.getInt( - DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, - HdfsServerConstants.READ_TIMEOUT); - final SocketFactory socketFactory = NetUtils.getSocketFactory(conf, - ClientProtocol.class); try { final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, datanode, conf, getUGI(request, conf)); - final ClientProtocol nnproxy = dfs.getNamenode(); - final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum( - path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey(), false); + final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path); MD5MD5CRC32FileChecksum.write(xml, checksum); } catch(IOException ioe) { writeXml(ioe, path, xml); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index d97bd7daee1..0e78e7b3d58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -183,5 +183,5 @@ message OpBlockChecksumResponseProto { required uint32 bytesPerCrc = 1; required uint64 crcPerBlock = 2; required bytes md5 = 3; - optional ChecksumTypeProto crcType = 4 [default = CHECKSUM_CRC32]; + optional ChecksumTypeProto crcType = 4; }