HDFS-3429. DataNode reads checksums even if client does not need them. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1433116 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2013-01-14 20:44:29 +00:00
parent 2fa1932e9d
commit 034ec8af5a
15 changed files with 101 additions and 42 deletions

View File

@ -193,6 +193,8 @@ Release 2.0.3-alpha - Unreleased
OPTIMIZATIONS
HDFS-3429. DataNode reads checksums even if client does not need them (todd)
BUG FIXES
HDFS-3919. MiniDFSCluster:waitClusterUp can hang forever.

View File

@ -380,7 +380,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum);
//
// Get bytes in block, set streams

View File

@ -392,7 +392,8 @@ public class RemoteBlockReader2 implements BlockReader {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
ioStreams.out));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum);
//
// Get bytes in block

View File

@ -55,12 +55,15 @@ public interface DataTransferProtocol {
* @param clientName client's name.
* @param blockOffset offset of the block.
* @param length maximum number of bytes for this read.
* @param sendChecksum if false, the DN should skip reading and sending
* checksums
*/
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length) throws IOException;
final long length,
final boolean sendChecksum) throws IOException;
/**
* Write a block to a datanode pipeline.

View File

@ -88,7 +88,8 @@ public abstract class Receiver implements DataTransferProtocol {
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
proto.getOffset(),
proto.getLen());
proto.getLen(),
proto.getSendChecksums());
}
/** Receive OP_WRITE_BLOCK */

View File

@ -62,6 +62,10 @@ public class Sender implements DataTransferProtocol {
private static void send(final DataOutputStream out, final Op opcode,
final Message proto) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
+ ": " + proto);
}
op(out, opcode);
proto.writeDelimitedTo(out);
out.flush();
@ -72,12 +76,14 @@ public class Sender implements DataTransferProtocol {
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length) throws IOException {
final long length,
final boolean sendChecksum) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
.setLen(length)
.setSendChecksums(sendChecksum)
.build();
send(out, Op.READ_BLOCK, proto);

View File

@ -388,8 +388,8 @@ class BlockPoolSliceScanner {
try {
adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, true, datanode,
null);
blockSender = new BlockSender(block, 0, -1, false, true, true,
datanode, null);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());

View File

@ -45,6 +45,8 @@ import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
/**
* Reads a block from the disk and sends it to a recipient.
*
@ -158,12 +160,14 @@ class BlockSender implements java.io.Closeable {
* @param length length of data to read
* @param corruptChecksumOk
* @param verifyChecksum verify checksum while reading the data
* @param sendChecksum send checksum to client.
* @param datanode datanode from which the block is being read
* @param clientTraceFmt format string used to print client trace logs
* @throws IOException
*/
BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean verifyChecksum,
boolean sendChecksum,
DataNode datanode, String clientTraceFmt)
throws IOException {
try {
@ -175,6 +179,13 @@ class BlockSender implements java.io.Closeable {
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
this.datanode = datanode;
if (verifyChecksum) {
// To simplify implementation, callers may not specify verification
// without sending.
Preconditions.checkArgument(sendChecksum,
"If verifying checksum, currently must also send it.");
}
final Replica replica;
final long replicaVisibleLength;
synchronized(datanode.data) {
@ -213,29 +224,37 @@ class BlockSender implements java.io.Closeable {
* False, True: will verify checksum
* False, False: throws IOException file not found
*/
DataChecksum csum;
final InputStream metaIn = datanode.data.getMetaDataInputStream(block);
if (!corruptChecksumOk || metaIn != null) {
if (metaIn == null) {
//need checksum but meta-data not found
throw new FileNotFoundException("Meta-data not found for " + block);
}
checksumIn = new DataInputStream(
new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
DataChecksum csum = null;
if (verifyChecksum || sendChecksum) {
final InputStream metaIn = datanode.data.getMetaDataInputStream(block);
if (!corruptChecksumOk || metaIn != null) {
if (metaIn == null) {
//need checksum but meta-data not found
throw new FileNotFoundException("Meta-data not found for " + block);
}
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
short version = header.getVersion();
if (version != BlockMetadataHeader.VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ block + " ignoring ...");
checksumIn = new DataInputStream(
new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
short version = header.getVersion();
if (version != BlockMetadataHeader.VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ block + " ignoring ...");
}
csum = header.getChecksum();
} else {
LOG.warn("Could not find metadata file for " + block);
}
csum = header.getChecksum();
} else {
LOG.warn("Could not find metadata file for " + block);
// This only decides the buffer size. Use BUFFER_SIZE?
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 16 * 1024);
}
if (csum == null) {
// The number of bytes per checksum here determines the alignment
// of reads: we always start reading at a checksum chunk boundary,
// even if the checksum type is NULL. So, choosing too big of a value
// would risk sending too much unnecessary data. 512 (1 disk sector)
// is likely to result in minimal extra IO.
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);
}
/*

View File

@ -1441,7 +1441,7 @@ public class DataNode extends Configured
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, DataNode.this, null);
false, false, true, DataNode.this, null);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
//

View File

@ -241,7 +241,8 @@ class DataXceiver extends Receiver implements Runnable {
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length) throws IOException {
final long length,
final boolean sendChecksum) throws IOException {
previousOpClientName = clientName;
OutputStream baseStream = getOutputStream();
@ -266,7 +267,7 @@ class DataXceiver extends Receiver implements Runnable {
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
true, false, datanode, clientTraceFmt);
true, false, sendChecksum, datanode, clientTraceFmt);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
@ -654,7 +655,7 @@ class DataXceiver extends Receiver implements Runnable {
try {
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, datanode,
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
null);
// set up response stream

View File

@ -52,6 +52,7 @@ message OpReadBlockProto {
required ClientOperationHeaderProto header = 1;
required uint64 offset = 2;
required uint64 len = 3;
optional bool sendChecksums = 4 [default = true];
}

View File

@ -450,21 +450,21 @@ public class TestDataTransferProtocol {
recvBuf.reset();
blk.setBlockId(blkid-1);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen);
0L, fileLen, true);
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
// negative block start offset -1L
sendBuf.reset();
blk.setBlockId(blkid);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-1L, fileLen);
-1L, fileLen, true);
sendRecvData("Negative start-offset for read for block " +
firstBlock.getBlockId(), false);
// bad block start offset
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
fileLen, fileLen);
fileLen, fileLen, true);
sendRecvData("Wrong start-offset for reading block " +
firstBlock.getBlockId(), false);
@ -481,7 +481,7 @@ public class TestDataTransferProtocol {
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, -1L-random.nextInt(oneMil));
0L, -1L-random.nextInt(oneMil), true);
sendRecvData("Negative length for reading block " +
firstBlock.getBlockId(), false);
@ -494,14 +494,14 @@ public class TestDataTransferProtocol {
recvOut);
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen+1);
0L, fileLen+1, true);
sendRecvData("Wrong length for reading block " +
firstBlock.getBlockId(), false);
//At the end of all this, read the file to make sure that succeeds finally.
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen);
0L, fileLen, true);
readFile(fileSys, file, fileLen);
} finally {
cluster.shutdown();

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -56,4 +59,11 @@ public class TestParallelRead extends TestParallelReadUtil {
public void testParallelReadMixed() throws IOException {
runTestWorkload(new MixedWorkloadHelper());
}
@Test
public void testParallelNoChecksums() throws IOException {
verifyChecksums = false;
runTestWorkload(new MixedWorkloadHelper());
}
}

View File

@ -46,6 +46,7 @@ public class TestParallelReadUtil {
static final int FILE_SIZE_K = 256;
static Random rand = null;
static final int DEFAULT_REPLICATION_FACTOR = 2;
protected boolean verifyChecksums = true;
static {
// The client-trace log ends up causing a lot of blocking threads
@ -317,7 +318,8 @@ public class TestParallelReadUtil {
testInfo.filepath = new Path("/TestParallelRead.dat." + i);
testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
testInfo.dis = dfsClient.open(testInfo.filepath.toString());
testInfo.dis = dfsClient.open(testInfo.filepath.toString(),
dfsClient.dfsClientConf.ioBufferSize, verifyChecksums);
for (int j = 0; j < nWorkerEach; ++j) {
workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper);

View File

@ -24,11 +24,14 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.log4j.Level;
import org.junit.Test;
/**
@ -199,11 +202,19 @@ public class TestPread {
*/
@Test
public void testPreadDFS() throws IOException {
dfsPreadTest(false); //normal pread
dfsPreadTest(true); //trigger read code path without transferTo.
dfsPreadTest(false, true); //normal pread
dfsPreadTest(true, true); //trigger read code path without transferTo.
}
private void dfsPreadTest(boolean disableTransferTo) throws IOException {
@Test
public void testPreadDFSNoChecksum() throws IOException {
((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
dfsPreadTest(false, false);
dfsPreadTest(true, false);
}
private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum)
throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
@ -215,6 +226,7 @@ public class TestPread {
}
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fileSys = cluster.getFileSystem();
fileSys.setVerifyChecksum(verifyChecksum);
try {
Path file1 = new Path("preadtest.dat");
writeFile(fileSys, file1);