HDFS-3429. DataNode reads checksums even if client does not need them. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1433117 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
337e066bc3
commit
3052ad1f00
|
@ -482,6 +482,8 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-3429. DataNode reads checksums even if client does not need them (todd)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-3919. MiniDFSCluster:waitClusterUp can hang forever.
|
||||
|
|
|
@ -380,7 +380,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
|
|||
// in and out will be closed when sock is closed (by the caller)
|
||||
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||
NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT)));
|
||||
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
|
||||
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
|
||||
verifyChecksum);
|
||||
|
||||
//
|
||||
// Get bytes in block, set streams
|
||||
|
|
|
@ -392,7 +392,8 @@ public class RemoteBlockReader2 implements BlockReader {
|
|||
// in and out will be closed when sock is closed (by the caller)
|
||||
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
|
||||
ioStreams.out));
|
||||
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
|
||||
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
|
||||
verifyChecksum);
|
||||
|
||||
//
|
||||
// Get bytes in block
|
||||
|
|
|
@ -55,12 +55,15 @@ public interface DataTransferProtocol {
|
|||
* @param clientName client's name.
|
||||
* @param blockOffset offset of the block.
|
||||
* @param length maximum number of bytes for this read.
|
||||
* @param sendChecksum if false, the DN should skip reading and sending
|
||||
* checksums
|
||||
*/
|
||||
public void readBlock(final ExtendedBlock blk,
|
||||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final long blockOffset,
|
||||
final long length) throws IOException;
|
||||
final long length,
|
||||
final boolean sendChecksum) throws IOException;
|
||||
|
||||
/**
|
||||
* Write a block to a datanode pipeline.
|
||||
|
|
|
@ -88,7 +88,8 @@ public abstract class Receiver implements DataTransferProtocol {
|
|||
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
|
||||
proto.getHeader().getClientName(),
|
||||
proto.getOffset(),
|
||||
proto.getLen());
|
||||
proto.getLen(),
|
||||
proto.getSendChecksums());
|
||||
}
|
||||
|
||||
/** Receive OP_WRITE_BLOCK */
|
||||
|
|
|
@ -62,6 +62,10 @@ public class Sender implements DataTransferProtocol {
|
|||
|
||||
private static void send(final DataOutputStream out, final Op opcode,
|
||||
final Message proto) throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Sending DataTransferOp " + proto.getClass().getSimpleName()
|
||||
+ ": " + proto);
|
||||
}
|
||||
op(out, opcode);
|
||||
proto.writeDelimitedTo(out);
|
||||
out.flush();
|
||||
|
@ -72,12 +76,14 @@ public class Sender implements DataTransferProtocol {
|
|||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final long blockOffset,
|
||||
final long length) throws IOException {
|
||||
final long length,
|
||||
final boolean sendChecksum) throws IOException {
|
||||
|
||||
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
|
||||
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
|
||||
.setOffset(blockOffset)
|
||||
.setLen(length)
|
||||
.setSendChecksums(sendChecksum)
|
||||
.build();
|
||||
|
||||
send(out, Op.READ_BLOCK, proto);
|
||||
|
|
|
@ -388,8 +388,8 @@ class BlockPoolSliceScanner {
|
|||
try {
|
||||
adjustThrottler();
|
||||
|
||||
blockSender = new BlockSender(block, 0, -1, false, true, datanode,
|
||||
null);
|
||||
blockSender = new BlockSender(block, 0, -1, false, true, true,
|
||||
datanode, null);
|
||||
|
||||
DataOutputStream out =
|
||||
new DataOutputStream(new IOUtils.NullOutputStream());
|
||||
|
|
|
@ -45,6 +45,8 @@ import org.apache.hadoop.io.nativeio.NativeIO;
|
|||
import org.apache.hadoop.net.SocketOutputStream;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Reads a block from the disk and sends it to a recipient.
|
||||
*
|
||||
|
@ -158,12 +160,14 @@ class BlockSender implements java.io.Closeable {
|
|||
* @param length length of data to read
|
||||
* @param corruptChecksumOk
|
||||
* @param verifyChecksum verify checksum while reading the data
|
||||
* @param sendChecksum send checksum to client.
|
||||
* @param datanode datanode from which the block is being read
|
||||
* @param clientTraceFmt format string used to print client trace logs
|
||||
* @throws IOException
|
||||
*/
|
||||
BlockSender(ExtendedBlock block, long startOffset, long length,
|
||||
boolean corruptChecksumOk, boolean verifyChecksum,
|
||||
boolean sendChecksum,
|
||||
DataNode datanode, String clientTraceFmt)
|
||||
throws IOException {
|
||||
try {
|
||||
|
@ -175,6 +179,13 @@ class BlockSender implements java.io.Closeable {
|
|||
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
|
||||
this.datanode = datanode;
|
||||
|
||||
if (verifyChecksum) {
|
||||
// To simplify implementation, callers may not specify verification
|
||||
// without sending.
|
||||
Preconditions.checkArgument(sendChecksum,
|
||||
"If verifying checksum, currently must also send it.");
|
||||
}
|
||||
|
||||
final Replica replica;
|
||||
final long replicaVisibleLength;
|
||||
synchronized(datanode.data) {
|
||||
|
@ -213,29 +224,37 @@ class BlockSender implements java.io.Closeable {
|
|||
* False, True: will verify checksum
|
||||
* False, False: throws IOException file not found
|
||||
*/
|
||||
DataChecksum csum;
|
||||
final InputStream metaIn = datanode.data.getMetaDataInputStream(block);
|
||||
if (!corruptChecksumOk || metaIn != null) {
|
||||
if (metaIn == null) {
|
||||
//need checksum but meta-data not found
|
||||
throw new FileNotFoundException("Meta-data not found for " + block);
|
||||
}
|
||||
|
||||
checksumIn = new DataInputStream(
|
||||
new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
DataChecksum csum = null;
|
||||
if (verifyChecksum || sendChecksum) {
|
||||
final InputStream metaIn = datanode.data.getMetaDataInputStream(block);
|
||||
if (!corruptChecksumOk || metaIn != null) {
|
||||
if (metaIn == null) {
|
||||
//need checksum but meta-data not found
|
||||
throw new FileNotFoundException("Meta-data not found for " + block);
|
||||
}
|
||||
|
||||
// read and handle the common header here. For now just a version
|
||||
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
|
||||
short version = header.getVersion();
|
||||
if (version != BlockMetadataHeader.VERSION) {
|
||||
LOG.warn("Wrong version (" + version + ") for metadata file for "
|
||||
+ block + " ignoring ...");
|
||||
checksumIn = new DataInputStream(
|
||||
new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
||||
// read and handle the common header here. For now just a version
|
||||
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
|
||||
short version = header.getVersion();
|
||||
if (version != BlockMetadataHeader.VERSION) {
|
||||
LOG.warn("Wrong version (" + version + ") for metadata file for "
|
||||
+ block + " ignoring ...");
|
||||
}
|
||||
csum = header.getChecksum();
|
||||
} else {
|
||||
LOG.warn("Could not find metadata file for " + block);
|
||||
}
|
||||
csum = header.getChecksum();
|
||||
} else {
|
||||
LOG.warn("Could not find metadata file for " + block);
|
||||
// This only decides the buffer size. Use BUFFER_SIZE?
|
||||
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 16 * 1024);
|
||||
}
|
||||
if (csum == null) {
|
||||
// The number of bytes per checksum here determines the alignment
|
||||
// of reads: we always start reading at a checksum chunk boundary,
|
||||
// even if the checksum type is NULL. So, choosing too big of a value
|
||||
// would risk sending too much unnecessary data. 512 (1 disk sector)
|
||||
// is likely to result in minimal extra IO.
|
||||
csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -1441,7 +1441,7 @@ public class DataNode extends Configured
|
|||
HdfsConstants.SMALL_BUFFER_SIZE));
|
||||
in = new DataInputStream(unbufIn);
|
||||
blockSender = new BlockSender(b, 0, b.getNumBytes(),
|
||||
false, false, DataNode.this, null);
|
||||
false, false, true, DataNode.this, null);
|
||||
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
|
||||
|
||||
//
|
||||
|
|
|
@ -241,7 +241,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
final Token<BlockTokenIdentifier> blockToken,
|
||||
final String clientName,
|
||||
final long blockOffset,
|
||||
final long length) throws IOException {
|
||||
final long length,
|
||||
final boolean sendChecksum) throws IOException {
|
||||
previousOpClientName = clientName;
|
||||
|
||||
OutputStream baseStream = getOutputStream();
|
||||
|
@ -266,7 +267,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
try {
|
||||
try {
|
||||
blockSender = new BlockSender(block, blockOffset, length,
|
||||
true, false, datanode, clientTraceFmt);
|
||||
true, false, sendChecksum, datanode, clientTraceFmt);
|
||||
} catch(IOException e) {
|
||||
String msg = "opReadBlock " + block + " received exception " + e;
|
||||
LOG.info(msg);
|
||||
|
@ -654,7 +655,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
|
||||
try {
|
||||
// check if the block exists or not
|
||||
blockSender = new BlockSender(block, 0, -1, false, false, datanode,
|
||||
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
|
||||
null);
|
||||
|
||||
// set up response stream
|
||||
|
|
|
@ -52,6 +52,7 @@ message OpReadBlockProto {
|
|||
required ClientOperationHeaderProto header = 1;
|
||||
required uint64 offset = 2;
|
||||
required uint64 len = 3;
|
||||
optional bool sendChecksums = 4 [default = true];
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -444,21 +444,21 @@ public class TestDataTransferProtocol {
|
|||
recvBuf.reset();
|
||||
blk.setBlockId(blkid-1);
|
||||
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
0L, fileLen);
|
||||
0L, fileLen, true);
|
||||
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
|
||||
|
||||
// negative block start offset -1L
|
||||
sendBuf.reset();
|
||||
blk.setBlockId(blkid);
|
||||
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
-1L, fileLen);
|
||||
-1L, fileLen, true);
|
||||
sendRecvData("Negative start-offset for read for block " +
|
||||
firstBlock.getBlockId(), false);
|
||||
|
||||
// bad block start offset
|
||||
sendBuf.reset();
|
||||
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
fileLen, fileLen);
|
||||
fileLen, fileLen, true);
|
||||
sendRecvData("Wrong start-offset for reading block " +
|
||||
firstBlock.getBlockId(), false);
|
||||
|
||||
|
@ -475,7 +475,7 @@ public class TestDataTransferProtocol {
|
|||
|
||||
sendBuf.reset();
|
||||
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
0L, -1L-random.nextInt(oneMil));
|
||||
0L, -1L-random.nextInt(oneMil), true);
|
||||
sendRecvData("Negative length for reading block " +
|
||||
firstBlock.getBlockId(), false);
|
||||
|
||||
|
@ -488,14 +488,14 @@ public class TestDataTransferProtocol {
|
|||
recvOut);
|
||||
sendBuf.reset();
|
||||
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
0L, fileLen+1);
|
||||
0L, fileLen+1, true);
|
||||
sendRecvData("Wrong length for reading block " +
|
||||
firstBlock.getBlockId(), false);
|
||||
|
||||
//At the end of all this, read the file to make sure that succeeds finally.
|
||||
sendBuf.reset();
|
||||
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
|
||||
0L, fileLen);
|
||||
0L, fileLen, true);
|
||||
readFile(fileSys, file, fileLen);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
|
|
|
@ -19,6 +19,9 @@ package org.apache.hadoop.hdfs;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -56,4 +59,11 @@ public class TestParallelRead extends TestParallelReadUtil {
|
|||
public void testParallelReadMixed() throws IOException {
|
||||
runTestWorkload(new MixedWorkloadHelper());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParallelNoChecksums() throws IOException {
|
||||
verifyChecksums = false;
|
||||
runTestWorkload(new MixedWorkloadHelper());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ public class TestParallelReadUtil {
|
|||
static final int FILE_SIZE_K = 256;
|
||||
static Random rand = null;
|
||||
static final int DEFAULT_REPLICATION_FACTOR = 2;
|
||||
protected boolean verifyChecksums = true;
|
||||
|
||||
static {
|
||||
// The client-trace log ends up causing a lot of blocking threads
|
||||
|
@ -317,7 +318,8 @@ public class TestParallelReadUtil {
|
|||
|
||||
testInfo.filepath = new Path("/TestParallelRead.dat." + i);
|
||||
testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
|
||||
testInfo.dis = dfsClient.open(testInfo.filepath.toString());
|
||||
testInfo.dis = dfsClient.open(testInfo.filepath.toString(),
|
||||
dfsClient.dfsClientConf.ioBufferSize, verifyChecksums);
|
||||
|
||||
for (int j = 0; j < nWorkerEach; ++j) {
|
||||
workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper);
|
||||
|
|
|
@ -24,11 +24,14 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -194,11 +197,19 @@ public class TestPread {
|
|||
*/
|
||||
@Test
|
||||
public void testPreadDFS() throws IOException {
|
||||
dfsPreadTest(false); //normal pread
|
||||
dfsPreadTest(true); //trigger read code path without transferTo.
|
||||
dfsPreadTest(false, true); //normal pread
|
||||
dfsPreadTest(true, true); //trigger read code path without transferTo.
|
||||
}
|
||||
|
||||
private void dfsPreadTest(boolean disableTransferTo) throws IOException {
|
||||
@Test
|
||||
public void testPreadDFSNoChecksum() throws IOException {
|
||||
((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
|
||||
dfsPreadTest(false, false);
|
||||
dfsPreadTest(true, false);
|
||||
}
|
||||
|
||||
private void dfsPreadTest(boolean disableTransferTo, boolean verifyChecksum)
|
||||
throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
|
||||
conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
|
||||
|
@ -210,6 +221,7 @@ public class TestPread {
|
|||
}
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
FileSystem fileSys = cluster.getFileSystem();
|
||||
fileSys.setVerifyChecksum(verifyChecksum);
|
||||
try {
|
||||
Path file1 = new Path("preadtest.dat");
|
||||
writeFile(fileSys, file1);
|
||||
|
|
Loading…
Reference in New Issue