HDFS-4403. DFSClient can infer checksum type when not provided by reading first byte. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1436730 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-01-22 02:59:25 +00:00
parent c5b01ce424
commit cfae13306a
4 changed files with 131 additions and 51 deletions

View File

@ -494,6 +494,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4393. Make empty request and responses in protocol translators can be HDFS-4393. Make empty request and responses in protocol translators can be
static final members. (Brandon Li via suresh) static final members. (Brandon Li via suresh)
HDFS-4403. DFSClient can infer checksum type when not provided by reading
first byte (todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-3429. DataNode reads checksums even if client does not need them (todd) HDFS-3429. DataNode reads checksums even if client does not need them (todd)

View File

@ -150,6 +150,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -1562,7 +1563,7 @@ public class DFSClient implements java.io.Closeable {
*/ */
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen(); checkOpen();
return getFileChecksum(src, namenode, socketFactory, return getFileChecksum(src, clientName, namenode, socketFactory,
dfsClientConf.socketTimeout, getDataEncryptionKey(), dfsClientConf.socketTimeout, getDataEncryptionKey(),
dfsClientConf.connectToDnViaHostname); dfsClientConf.connectToDnViaHostname);
} }
@ -1605,9 +1606,16 @@ public class DFSClient implements java.io.Closeable {
/** /**
* Get the checksum of a file. * Get the checksum of a file.
* @param src The file path * @param src The file path
* @param clientName the name of the client requesting the checksum.
* @param namenode the RPC proxy for the namenode
* @param socketFactory to create sockets to connect to DNs
* @param socketTimeout timeout to use when connecting and waiting for a response
* @param encryptionKey the key needed to communicate with DNs in this cluster
* @param connectToDnViaHostname {@see #connectToDnViaHostname()}
* @return The checksum * @return The checksum
*/ */
public static MD5MD5CRC32FileChecksum getFileChecksum(String src, static MD5MD5CRC32FileChecksum getFileChecksum(String src,
String clientName,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname) DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
throws IOException { throws IOException {
@ -1642,32 +1650,16 @@ public class DFSClient implements java.io.Closeable {
final int timeout = 3000 * datanodes.length + socketTimeout; final int timeout = 3000 * datanodes.length + socketTimeout;
boolean done = false; boolean done = false;
for(int j = 0; !done && j < datanodes.length; j++) { for(int j = 0; !done && j < datanodes.length; j++) {
Socket sock = null;
DataOutputStream out = null; DataOutputStream out = null;
DataInputStream in = null; DataInputStream in = null;
try { try {
//connect to a datanode //connect to a datanode
sock = socketFactory.createSocket(); IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
String dnAddr = datanodes[j].getXferAddr(connectToDnViaHostname); encryptionKey, datanodes[j], timeout);
if (LOG.isDebugEnabled()) { out = new DataOutputStream(new BufferedOutputStream(pair.out,
LOG.debug("Connecting to datanode " + dnAddr);
}
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock);
if (encryptionKey != null) {
IOStreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(
unbufOut, unbufIn, encryptionKey);
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
}
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn); in = new DataInputStream(pair.in);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("write to " + datanodes[j] + ": " LOG.debug("write to " + datanodes[j] + ": "
@ -1680,19 +1672,8 @@ public class DFSClient implements java.io.Closeable {
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (reply.getStatus() != Status.SUCCESS) { if (reply.getStatus() != Status.SUCCESS) {
if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {
&& i > lastRetriedIndex) { throw new InvalidBlockTokenException();
if (LOG.isDebugEnabled()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ "for file " + src + " for block " + block
+ " from datanode " + datanodes[j]
+ ". Will retry the block once.");
}
lastRetriedIndex = i;
done = true; // actually it's not done; but we'll retry
i--; // repeat at i-th block
refetchBlocks = true;
break;
} else { } else {
throw new IOException("Bad response " + reply + " for block " throw new IOException("Bad response " + reply + " for block "
+ block + " from datanode " + datanodes[j]); + block + " from datanode " + datanodes[j]);
@ -1724,8 +1705,18 @@ public class DFSClient implements java.io.Closeable {
md5.write(md5out); md5.write(md5out);
// read crc-type // read crc-type
final DataChecksum.Type ct = PBHelper.convert(checksumData final DataChecksum.Type ct;
.getCrcType()); if (checksumData.hasCrcType()) {
ct = PBHelper.convert(checksumData
.getCrcType());
} else {
LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
"inferring checksum by reading first byte");
ct = inferChecksumTypeByReading(
clientName, socketFactory, socketTimeout, lb, datanodes[j],
encryptionKey, connectToDnViaHostname);
}
if (i == 0) { // first block if (i == 0) { // first block
crcType = ct; crcType = ct;
} else if (crcType != DataChecksum.Type.MIXED } else if (crcType != DataChecksum.Type.MIXED
@ -1743,12 +1734,25 @@ public class DFSClient implements java.io.Closeable {
} }
LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5); LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);
} }
} catch (InvalidBlockTokenException ibte) {
if (i > lastRetriedIndex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ "for file " + src + " for block " + block
+ " from datanode " + datanodes[j]
+ ". Will retry the block once.");
}
lastRetriedIndex = i;
done = true; // actually it's not done; but we'll retry
i--; // repeat at i-th block
refetchBlocks = true;
break;
}
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie); LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);
} finally { } finally {
IOUtils.closeStream(in); IOUtils.closeStream(in);
IOUtils.closeStream(out); IOUtils.closeStream(out);
IOUtils.closeSocket(sock);
} }
} }
@ -1780,6 +1784,90 @@ public class DFSClient implements java.io.Closeable {
} }
} }
/**
* Connect to the given datanode's datantrasfer port, and return
* the resulting IOStreamPair. This includes encryption wrapping, etc.
*/
private static IOStreamPair connectToDN(
SocketFactory socketFactory, boolean connectToDnViaHostname,
DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout)
throws IOException
{
boolean success = false;
Socket sock = null;
try {
sock = socketFactory.createSocket();
String dnAddr = dn.getXferAddr(connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock);
IOStreamPair ret;
if (encryptionKey != null) {
ret = DataTransferEncryptor.getEncryptedStreams(
unbufOut, unbufIn, encryptionKey);
} else {
ret = new IOStreamPair(unbufIn, unbufOut);
}
success = true;
return ret;
} finally {
if (!success) {
IOUtils.closeSocket(sock);
}
}
}
/**
* Infer the checksum type for a replica by sending an OP_READ_BLOCK
* for the first byte of that replica. This is used for compatibility
* with older HDFS versions which did not include the checksum type in
* OpBlockChecksumResponseProto.
*
* @param in input stream from datanode
* @param out output stream to datanode
* @param lb the located block
* @param clientName the name of the DFSClient requesting the checksum
* @param dn the connected datanode
* @return the inferred checksum type
* @throws IOException if an error occurs
*/
private static Type inferChecksumTypeByReading(
String clientName, SocketFactory socketFactory, int socketTimeout,
LocatedBlock lb, DatanodeInfo dn,
DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
throws IOException {
IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
encryptionKey, dn, socketTimeout);
try {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
HdfsConstants.SMALL_BUFFER_SIZE));
DataInputStream in = new DataInputStream(pair.in);
new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true);
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
if (reply.getStatus() != Status.SUCCESS) {
if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException();
} else {
throw new IOException("Bad response " + reply + " trying to read "
+ lb.getBlock() + " from datanode " + dn);
}
}
return PBHelper.convert(reply.getReadOpChecksumInfo().getChecksum().getType());
} finally {
IOUtils.cleanup(null, pair.in, pair.out);
}
}
/** /**
* Set permissions to a file or directory. * Set permissions to a file or directory.
* @param src path name. * @param src path name.

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.URL; import java.net.URL;
import javax.net.SocketFactory;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -33,14 +32,11 @@ import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper; import org.apache.hadoop.hdfs.server.datanode.DatanodeJspHelper;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ServletUtil; import org.apache.hadoop.util.ServletUtil;
import org.znerd.xmlenc.XMLOutputter; import org.znerd.xmlenc.XMLOutputter;
@ -116,18 +112,11 @@ public class FileChecksumServlets {
final DataNode datanode = (DataNode) context.getAttribute("datanode"); final DataNode datanode = (DataNode) context.getAttribute("datanode");
final Configuration conf = final Configuration conf =
new HdfsConfiguration(datanode.getConf()); new HdfsConfiguration(datanode.getConf());
final int socketTimeout = conf.getInt(
DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
final SocketFactory socketFactory = NetUtils.getSocketFactory(conf,
ClientProtocol.class);
try { try {
final DFSClient dfs = DatanodeJspHelper.getDFSClient(request, final DFSClient dfs = DatanodeJspHelper.getDFSClient(request,
datanode, conf, getUGI(request, conf)); datanode, conf, getUGI(request, conf));
final ClientProtocol nnproxy = dfs.getNamenode(); final MD5MD5CRC32FileChecksum checksum = dfs.getFileChecksum(path);
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey(), false);
MD5MD5CRC32FileChecksum.write(xml, checksum); MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) { } catch(IOException ioe) {
writeXml(ioe, path, xml); writeXml(ioe, path, xml);

View File

@ -183,5 +183,5 @@ message OpBlockChecksumResponseProto {
required uint32 bytesPerCrc = 1; required uint32 bytesPerCrc = 1;
required uint64 crcPerBlock = 2; required uint64 crcPerBlock = 2;
required bytes md5 = 3; required bytes md5 = 3;
optional ChecksumTypeProto crcType = 4 [default = CHECKSUM_CRC32]; optional ChecksumTypeProto crcType = 4;
} }