HDFS-2371. Refactor BlockSender.java for better readability. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1177161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2011-09-29 04:40:15 +00:00
parent e9dd78d9fe
commit e90a5b4043
5 changed files with 341 additions and 231 deletions

View File

@ -21,7 +21,8 @@ Trunk (unreleased changes)
IMPROVEMENTS IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) HADOOP-7524 Change RPC to allow multiple protocols including multuple
versions of the same protocol (sanjay Radia)
HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants -> HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
HdfsConstants. (Harsh J Chouraria via atm) HdfsConstants. (Harsh J Chouraria via atm)
@ -50,6 +51,8 @@ Trunk (unreleased changes)
HDFS-2355. Federation: enable using the same configuration file across HDFS-2355. Federation: enable using the same configuration file across
all the nodes in the cluster. (suresh) all the nodes in the cluster. (suresh)
HDFS-2371. Refactor BlockSender.java for better readability. (suresh)
BUG FIXES BUG FIXES
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd) HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)

View File

@ -404,7 +404,7 @@ private void verifyBlock(ExtendedBlock block) {
adjustThrottler(); adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, false, true, blockSender = new BlockSender(block, 0, -1, false, false, true,
datanode); datanode, null);
DataOutputStream out = DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream()); new DataOutputStream(new IOUtils.NullOutputStream());

View File

@ -41,118 +41,162 @@
/** /**
* Reads a block from the disk and sends it to a recipient. * Reads a block from the disk and sends it to a recipient.
*
* Data sent from the BlockeSender in the following format:
* <br><b>Data format:</b> <pre>
* +--------------------------------------------------+
* | ChecksumHeader | Sequence of data PACKETS... |
* +--------------------------------------------------+
* </pre>
* <b>ChecksumHeader format:</b> <pre>
* +--------------------------------------------------+
* | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
* +--------------------------------------------------+
* </pre>
* An empty packet is sent to mark the end of block and read completion.
*
* PACKET Contains a packet header, checksum and data. Amount of data
* carried is set by BUFFER_SIZE.
* <pre>
* +-----------------------------------------------------+
* | 4 byte packet length (excluding packet header) |
* +-----------------------------------------------------+
* | 8 byte offset in the block | 8 byte sequence number |
* +-----------------------------------------------------+
* | 1 byte isLastPacketInBlock |
* +-----------------------------------------------------+
* | 4 byte Length of actual data |
* +-----------------------------------------------------+
* | x byte checksum data. x is defined below |
* +-----------------------------------------------------+
* | actual data ...... |
* +-----------------------------------------------------+
*
* Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
* A checksum is calculated for each chunk.
*
* x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
* CHECKSUM_SIZE
*
* CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
* </pre>
*
* The client reads data until it receives a packet with
* "LastPacketInBlock" set to true or with a zero length. If there is
* no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
* <pre>
* +------------------------------+
* | 2 byte OP_STATUS_CHECKSUM_OK |
* +------------------------------+
* </pre>
*/ */
class BlockSender implements java.io.Closeable { class BlockSender implements java.io.Closeable {
public static final Log LOG = DataNode.LOG; static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog; static final Log ClientTraceLog = DataNode.ClientTraceLog;
private static final boolean is32Bit =
private ExtendedBlock block; // the block to read from System.getProperty("sun.arch.data.model").equals("32");
/** the replica to read from */
private final Replica replica;
/** The visible length of a replica. */
private final long replicaVisibleLength;
private static final boolean is32Bit = System.getProperty("sun.arch.data.model").equals("32");
private InputStream blockIn; // data stream
private long blockInPosition = -1; // updated while using transferTo().
private DataInputStream checksumIn; // checksum datastream
private DataChecksum checksum; // checksum stream
private long offset; // starting position to read
private long endOffset; // ending position
private int bytesPerChecksum; // chunk size
private int checksumSize; // checksum size
private boolean corruptChecksumOk; // if need to verify checksum
private boolean chunkOffsetOK; // if need to send chunk offset
private long seqno; // sequence number of packet
private boolean transferToAllowed = true;
// set once entire requested byte range has been sent to the client
private boolean sentEntireByteRange;
private boolean verifyChecksum; //if true, check is verified while reading
private DataTransferThrottler throttler;
private final String clientTraceFmt; // format of client trace log message
/** /**
* Minimum buffer used while sending data to clients. Used only if * Minimum buffer used while sending data to clients. Used only if
* transferTo() is enabled. 64KB is not that large. It could be larger, but * transferTo() is enabled. 64KB is not that large. It could be larger, but
* not sure if there will be much more improvement. * not sure if there will be much more improvement.
*/ */
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024; private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
/** the block to read from */
private final ExtendedBlock block;
/** the replica to read from */
private final Replica replica;
/** The visible length of a replica. */
private final long replicaVisibleLength;
/** Stream to read block data from */
private InputStream blockIn;
/** updated while using transferTo() */
private long blockInPosition = -1;
/** Stream to read checksum */
private DataInputStream checksumIn;
/** Checksum utility */
private final DataChecksum checksum;
/** Starting position to read */
private long offset;
/** Position of last byte to read from block file */
private final long endOffset;
/** Number of bytes in chunk used for computing checksum */
private final int chunkSize;
/** Number bytes of checksum computed for a chunk */
private final int checksumSize;
/** If true, failure to read checksum is ignored */
private final boolean corruptChecksumOk;
/** true if chunk offset is needed to be sent in Checksum header */
private final boolean chunkOffsetOK;
/** Sequence number of packet being sent */
private long seqno;
/** Set to true if transferTo is allowed for sending data to the client */
private final boolean transferToAllowed;
/** Set to true once entire requested byte range has been sent to the client */
private boolean sentEntireByteRange;
/** When true, verify checksum while reading from checksum file */
private final boolean verifyChecksum;
/** Format used to print client trace log messages */
private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null; private volatile ChunkChecksum lastChunkChecksum = null;
/**
BlockSender(ExtendedBlock block, long startOffset, long length, * Constructor
boolean corruptChecksumOk, boolean chunkOffsetOK, *
boolean verifyChecksum, DataNode datanode) throws IOException { * @param block Block that is being read
this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK, * @param startOffset starting offset to read from
verifyChecksum, datanode, null); * @param length length of data to read
} * @param corruptChecksumOk
* @param chunkOffsetOK need to send check offset in checksum header
* @param verifyChecksum verify checksum while reading the data
* @param datanode datanode from which the block is being read
* @param clientTraceFmt format string used to print client trace logs
* @throws IOException
*/
BlockSender(ExtendedBlock block, long startOffset, long length, BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean chunkOffsetOK, boolean corruptChecksumOk, boolean chunkOffsetOK,
boolean verifyChecksum, DataNode datanode, String clientTraceFmt) boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
throws IOException { throws IOException {
try { try {
this.block = block; this.block = block;
this.chunkOffsetOK = chunkOffsetOK;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;
synchronized(datanode.data) { synchronized(datanode.data) {
this.replica = datanode.data.getReplica(block.getBlockPoolId(), this.replica = getReplica(block, datanode);
block.getBlockId());
if (replica == null) {
throw new ReplicaNotFoundException(block);
}
this.replicaVisibleLength = replica.getVisibleLength(); this.replicaVisibleLength = replica.getVisibleLength();
} }
long minEndOffset = startOffset + length; // if there is a write in progress
// if this is a write in progress
ChunkChecksum chunkChecksum = null; ChunkChecksum chunkChecksum = null;
if (replica instanceof ReplicaBeingWritten) { if (replica instanceof ReplicaBeingWritten) {
for (int i = 0; i < 30 && replica.getBytesOnDisk() < minEndOffset; i++) { long minEndOffset = startOffset + length;
try { waitForMinLength((ReplicaBeingWritten)replica, minEndOffset);
Thread.sleep(100);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
long currentBytesOnDisk = replica.getBytesOnDisk();
if (currentBytesOnDisk < minEndOffset) {
throw new IOException(String.format(
"need %d bytes, but only %d bytes available",
minEndOffset,
currentBytesOnDisk
));
}
ReplicaInPipeline rip = (ReplicaInPipeline) replica; ReplicaInPipeline rip = (ReplicaInPipeline) replica;
chunkChecksum = rip.getLastChecksumAndDataLen(); chunkChecksum = rip.getLastChecksumAndDataLen();
} }
if (replica.getGenerationStamp() < block.getGenerationStamp()) { if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException( throw new IOException("Replica gen stamp < block genstamp, block="
"replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ block + ", replica=" + replica); + block + ", replica=" + replica);
} }
if (replicaVisibleLength < 0) { if (replicaVisibleLength < 0) {
throw new IOException("The replica is not readable, block=" throw new IOException("Replica is not readable, block="
+ block + ", replica=" + replica); + block + ", replica=" + replica);
} }
if (DataNode.LOG.isDebugEnabled()) { if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("block=" + block + ", replica=" + replica); DataNode.LOG.debug("block=" + block + ", replica=" + replica);
} }
this.chunkOffsetOK = chunkOffsetOK;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
// transferToFully() fails on 32 bit platforms for block sizes >= 2GB, // transferToFully() fails on 32 bit platforms for block sizes >= 2GB,
// use normal transfer in those cases // use normal transfer in those cases
this.transferToAllowed = datanode.transferToAllowed && this.transferToAllowed = datanode.transferToAllowed &&
(!is32Bit || length < (long) Integer.MAX_VALUE); (!is32Bit || length <= Integer.MAX_VALUE);
this.clientTraceFmt = clientTraceFmt;
DataChecksum csum;
if (!corruptChecksumOk || datanode.data.metaFileExists(block)) { if (!corruptChecksumOk || datanode.data.metaFileExists(block)) {
checksumIn = new DataInputStream(new BufferedInputStream(datanode.data checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
.getMetaDataInputStream(block), HdfsConstants.IO_FILE_BUFFER_SIZE)); .getMetaDataInputStream(block), HdfsConstants.IO_FILE_BUFFER_SIZE));
@ -160,72 +204,67 @@ class BlockSender implements java.io.Closeable {
// read and handle the common header here. For now just a version // read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
short version = header.getVersion(); short version = header.getVersion();
if (version != FSDataset.METADATA_VERSION) { if (version != FSDataset.METADATA_VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for " LOG.warn("Wrong version (" + version + ") for metadata file for "
+ block + " ignoring ..."); + block + " ignoring ...");
} }
checksum = header.getChecksum(); csum = header.getChecksum();
} else { } else {
LOG.warn("Could not find metadata file for " + block); LOG.warn("Could not find metadata file for " + block);
// This only decides the buffer size. Use BUFFER_SIZE? // This only decides the buffer size. Use BUFFER_SIZE?
checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, csum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
16 * 1024); 16 * 1024);
} }
/* If bytesPerChecksum is very large, then the metadata file /*
* is mostly corrupted. For now just truncate bytesPerchecksum to * If chunkSize is very large, then the metadata file is mostly
* blockLength. * corrupted. For now just truncate bytesPerchecksum to blockLength.
*/ */
bytesPerChecksum = checksum.getBytesPerChecksum(); int size = csum.getBytesPerChecksum();
if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) { if (size > 10*1024*1024 && size > replicaVisibleLength) {
checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(), csum = DataChecksum.newDataChecksum(csum.getChecksumType(),
Math.max((int)replicaVisibleLength, 10*1024*1024)); Math.max((int)replicaVisibleLength, 10*1024*1024));
bytesPerChecksum = checksum.getBytesPerChecksum(); size = csum.getBytesPerChecksum();
} }
chunkSize = size;
checksum = csum;
checksumSize = checksum.getChecksumSize(); checksumSize = checksum.getChecksumSize();
length = length < 0 ? replicaVisibleLength : length;
if (length < 0) {
length = replicaVisibleLength;
}
// end is either last byte on disk or the length for which we have a // end is either last byte on disk or the length for which we have a
// checksum // checksum
if (chunkChecksum != null) { long end = chunkChecksum != null ? chunkChecksum.getDataLength()
endOffset = chunkChecksum.getDataLength(); : replica.getBytesOnDisk();
} else { if (startOffset < 0 || startOffset > end
endOffset = replica.getBytesOnDisk(); || (length + startOffset) > end) {
}
if (startOffset < 0 || startOffset > endOffset
|| (length + startOffset) > endOffset) {
String msg = " Offset " + startOffset + " and length " + length String msg = " Offset " + startOffset + " and length " + length
+ " don't match block " + block + " ( blockLen " + endOffset + " )"; + " don't match block " + block + " ( blockLen " + end + " )";
LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) + LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +
":sendBlock() : " + msg); ":sendBlock() : " + msg);
throw new IOException(msg); throw new IOException(msg);
} }
offset = (startOffset - (startOffset % bytesPerChecksum)); // Ensure read offset is position at the beginning of chunk
offset = startOffset - (startOffset % chunkSize);
if (length >= 0) { if (length >= 0) {
// Make sure endOffset points to end of a checksumed chunk. // Ensure endOffset points to end of chunk.
long tmpLen = startOffset + length; long tmpLen = startOffset + length;
if (tmpLen % bytesPerChecksum != 0) { if (tmpLen % chunkSize != 0) {
tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum); tmpLen += (chunkSize - tmpLen % chunkSize);
} }
if (tmpLen < endOffset) { if (tmpLen < end) {
// will use on-disk checksum here since the end is a stable chunk // will use on-disk checksum here since the end is a stable chunk
endOffset = tmpLen; end = tmpLen;
} else if (chunkChecksum != null) { } else if (chunkChecksum != null) {
//in last chunk which is changing. flag that we need to use in-memory // last chunk is changing. flag that we need to use in-memory checksum
// checksum
this.lastChunkChecksum = chunkChecksum; this.lastChunkChecksum = chunkChecksum;
} }
} }
endOffset = end;
// seek to the right offsets // seek to the right offsets
if (offset > 0) { if (offset > 0) {
long checksumSkip = (offset / bytesPerChecksum) * checksumSize; long checksumSkip = (offset / chunkSize) * checksumSize;
// note blockInStream is seeked when created below // note blockInStream is seeked when created below
if (checksumSkip > 0) { if (checksumSkip > 0) {
// Should we use seek() for checksum file as well? // Should we use seek() for checksum file as well?
@ -237,7 +276,6 @@ class BlockSender implements java.io.Closeable {
if (DataNode.LOG.isDebugEnabled()) { if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("replica=" + replica); DataNode.LOG.debug("replica=" + replica);
} }
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
} catch (IOException ioe) { } catch (IOException ioe) {
IOUtils.closeStream(this); IOUtils.closeStream(this);
@ -251,19 +289,17 @@ class BlockSender implements java.io.Closeable {
*/ */
public void close() throws IOException { public void close() throws IOException {
IOException ioe = null; IOException ioe = null;
// close checksum file
if(checksumIn!=null) { if(checksumIn!=null) {
try { try {
checksumIn.close(); checksumIn.close(); // close checksum file
} catch (IOException e) { } catch (IOException e) {
ioe = e; ioe = e;
} }
checksumIn = null; checksumIn = null;
} }
// close data file
if(blockIn!=null) { if(blockIn!=null) {
try { try {
blockIn.close(); blockIn.close(); // close data file
} catch (IOException e) { } catch (IOException e) {
ioe = e; ioe = e;
} }
@ -275,6 +311,40 @@ public void close() throws IOException {
} }
} }
private static Replica getReplica(ExtendedBlock block, DataNode datanode)
throws ReplicaNotFoundException {
Replica replica = datanode.data.getReplica(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
throw new ReplicaNotFoundException(block);
}
return replica;
}
/**
* Wait for rbw replica to reach the length
* @param rbw replica that is being written to
* @param len minimum length to reach
* @throws IOException on failing to reach the len in given wait time
*/
private static void waitForMinLength(ReplicaBeingWritten rbw, long len)
throws IOException {
// Wait for 3 seconds for rbw replica to reach the minimum length
for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
throw new IOException(ie);
}
}
long bytesOnDisk = rbw.getBytesOnDisk();
if (bytesOnDisk < len) {
throw new IOException(
String.format("Need %d bytes, but only %d bytes available", len,
bytesOnDisk));
}
}
/** /**
* Converts an IOExcpetion (not subclasses) to SocketException. * Converts an IOExcpetion (not subclasses) to SocketException.
* This is typically done to indicate to upper layers that the error * This is typically done to indicate to upper layers that the error
@ -296,54 +366,43 @@ private static IOException ioeToSocketException(IOException ioe) {
} }
/** /**
* Sends upto maxChunks chunks of data. * @param datalen Length of data
* * @return number of chunks for data of given size
* When blockInPosition is >= 0, assumes 'out' is a
* {@link SocketOutputStream} and tries
* {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
* send data (and updates blockInPosition).
*/ */
private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) private int numberOfChunks(long datalen) {
throws IOException { return (int) ((datalen + chunkSize - 1)/chunkSize);
// Sends multiple chunks in one packet with a single write(). }
int len = (int) Math.min(endOffset - offset, /**
(((long) bytesPerChecksum) * ((long) maxChunks))); * Sends a packet with up to maxChunks chunks of data.
int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum; *
int packetLen = len + numChunks*checksumSize + 4; * @param pkt buffer used for writing packet data
boolean lastDataPacket = offset + len == endOffset && len > 0; * @param maxChunks maximum number of chunks to send
pkt.clear(); * @param out stream to send data to
* @param transferTo use transferTo to send data
* @param throttler used for throttling data transfer bandwidth
*/
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
boolean transferTo, DataTransferThrottler throttler) throws IOException {
int dataLen = (int) Math.min(endOffset - offset,
(chunkSize * (long) maxChunks));
int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
int checksumDataLen = numChunks * checksumSize;
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
PacketHeader header = new PacketHeader( writePacketHeader(pkt, dataLen, packetLen);
packetLen, offset, seqno, (len == 0), len);
header.putInBuffer(pkt);
int checksumOff = pkt.position(); int checksumOff = pkt.position();
int checksumLen = numChunks * checksumSize;
byte[] buf = pkt.array(); byte[] buf = pkt.array();
if (checksumSize > 0 && checksumIn != null) { if (checksumSize > 0 && checksumIn != null) {
try { readChecksum(buf, checksumOff, checksumDataLen);
checksumIn.readFully(buf, checksumOff, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to veirfy checksum for data"
+ " at offset " + offset + " for block " + block, e);
IOUtils.closeStream(checksumIn);
checksumIn = null;
if (corruptChecksumOk) {
if (checksumOff < checksumLen) {
// Just fill the array with zeros.
Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
}
} else {
throw e;
}
}
// write in progress that we need to use to get last checksum // write in progress that we need to use to get last checksum
if (lastDataPacket && lastChunkChecksum != null) { if (lastDataPacket && lastChunkChecksum != null) {
int start = checksumOff + checksumLen - checksumSize; int start = checksumOff + checksumDataLen - checksumSize;
byte[] updatedChecksum = lastChunkChecksum.getChecksum(); byte[] updatedChecksum = lastChunkChecksum.getChecksum();
if (updatedChecksum != null) { if (updatedChecksum != null) {
@ -352,52 +411,28 @@ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
} }
} }
int dataOff = checksumOff + checksumLen; int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
if (blockInPosition < 0) { IOUtils.readFully(blockIn, buf, dataOff, dataLen);
//normal transfer
IOUtils.readFully(blockIn, buf, dataOff, len);
if (verifyChecksum) { if (verifyChecksum) {
int dOff = dataOff; verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
int cOff = checksumOff;
int dLeft = len;
for (int i=0; i<numChunks; i++) {
checksum.reset();
int dLen = Math.min(dLeft, bytesPerChecksum);
checksum.update(buf, dOff, dLen);
if (!checksum.compare(buf, cOff)) {
long failedPos = offset + len -dLeft;
throw new ChecksumException("Checksum failed at " +
failedPos, failedPos);
} }
dLeft -= dLen;
dOff += dLen;
cOff += checksumSize;
}
}
//writing is done below (mainly to handle IOException)
} }
try { try {
if (blockInPosition >= 0) { if (transferTo) {
//use transferTo(). Checks on out and blockIn are already done.
SocketOutputStream sockOut = (SocketOutputStream)out; SocketOutputStream sockOut = (SocketOutputStream)out;
//first write the packet sockOut.write(buf, 0, dataOff); // First write checksum
sockOut.write(buf, 0, dataOff);
// no need to flush. since we know out is not a buffered stream. // no need to flush. since we know out is not a buffered stream.
sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
blockInPosition, len); blockInPosition, dataLen);
blockInPosition += dataLen;
blockInPosition += len;
} else { } else {
// normal transfer // normal transfer
out.write(buf, 0, dataOff + len); out.write(buf, 0, dataOff + dataLen);
} }
} catch (IOException e) { } catch (IOException e) {
/* Exception while writing to the client. Connection closure from /* Exception while writing to the client. Connection closure from
* the other end is mostly the case and we do not care much about * the other end is mostly the case and we do not care much about
@ -419,7 +454,70 @@ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
throttler.throttle(packetLen); throttler.throttle(packetLen);
} }
return len; return dataLen;
}
/**
* Read checksum into given buffer
* @param buf buffer to read the checksum into
* @param checksumOffset offset at which to write the checksum into buf
* @param checksumLen length of checksum to write
* @throws IOException on error
*/
private void readChecksum(byte[] buf, final int checksumOffset,
final int checksumLen) throws IOException {
if (checksumSize <= 0 && checksumIn == null) {
return;
}
try {
checksumIn.readFully(buf, checksumOffset, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to veirfy checksum for data"
+ " at offset " + offset + " for block " + block, e);
IOUtils.closeStream(checksumIn);
checksumIn = null;
if (corruptChecksumOk) {
if (checksumOffset < checksumLen) {
// Just fill the array with zeros.
Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
}
} else {
throw e;
}
}
}
/**
* Compute checksum for chunks and verify the checksum that is read from
* the metadata file is correct.
*
* @param buf buffer that has checksum and data
* @param dataOffset position where data is written in the buf
* @param datalen length of data
* @param numChunks number of chunks corresponding to data
* @param checksumOffset offset where checksum is written in the buf
* @throws ChecksumException on failed checksum verification
*/
public void verifyChecksum(final byte[] buf, final int dataOffset,
final int datalen, final int numChunks, final int checksumOffset)
throws ChecksumException {
int dOff = dataOffset;
int cOff = checksumOffset;
int dLeft = datalen;
for (int i = 0; i < numChunks; i++) {
checksum.reset();
int dLen = Math.min(dLeft, chunkSize);
checksum.update(buf, dOff, dLen);
if (!checksum.compare(buf, cOff)) {
long failedPos = offset + datalen - dLeft;
throw new ChecksumException("Checksum failed at " + failedPos,
failedPos);
}
dLeft -= dLen;
dOff += dLen;
cOff += checksumSize;
}
} }
/** /**
@ -433,70 +531,54 @@ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
* {@link SocketOutputStream#transferToFully(FileChannel, * {@link SocketOutputStream#transferToFully(FileChannel,
* long, int)}. * long, int)}.
* @param throttler for sending data. * @param throttler for sending data.
* @return total bytes reads, including crc. * @return total bytes read, including checksum data.
*/ */
long sendBlock(DataOutputStream out, OutputStream baseStream, long sendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException { DataTransferThrottler throttler) throws IOException {
if (out == null) { if (out == null) {
throw new IOException( "out stream is null" ); throw new IOException( "out stream is null" );
} }
this.throttler = throttler; final long initialOffset = offset;
long initialOffset = offset;
long totalRead = 0; long totalRead = 0;
OutputStream streamForSendChunks = out; OutputStream streamForSendChunks = out;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try { try {
try { writeChecksumHeader(out);
checksum.writeHeader(out);
if ( chunkOffsetOK ) {
out.writeLong( offset );
}
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
int maxChunksPerPacket; int maxChunksPerPacket;
int pktSize = PacketHeader.PKT_HEADER_LEN; int pktSize = PacketHeader.PKT_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
if (transferToAllowed && !verifyChecksum && && baseStream instanceof SocketOutputStream
baseStream instanceof SocketOutputStream && && blockIn instanceof FileInputStream;
blockIn instanceof FileInputStream) { if (transferTo) {
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
// blockInPosition also indicates sendChunks() uses transferTo.
blockInPosition = fileChannel.position(); blockInPosition = fileChannel.position();
streamForSendChunks = baseStream; streamForSendChunks = baseStream;
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
// assure a mininum buffer size. // Smaller packet size to only hold checksum when doing transferTo
maxChunksPerPacket = (Math.max(HdfsConstants.IO_FILE_BUFFER_SIZE,
MIN_BUFFER_WITH_TRANSFERTO)
+ bytesPerChecksum - 1)/bytesPerChecksum;
// allocate smaller buffer while using transferTo().
pktSize += checksumSize * maxChunksPerPacket; pktSize += checksumSize * maxChunksPerPacket;
} else { } else {
maxChunksPerPacket = Math.max(1, (HdfsConstants.IO_FILE_BUFFER_SIZE maxChunksPerPacket = Math.max(1,
+ bytesPerChecksum - 1) / bytesPerChecksum); numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket; // Packet size includes both checksum and data
pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
} }
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
while (endOffset > offset) { while (endOffset > offset) {
long len = sendChunks(pktBuf, maxChunksPerPacket, long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
streamForSendChunks); transferTo, throttler);
offset += len; offset += len;
totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum* totalRead += len + (numberOfChunks(len) * checksumSize);
checksumSize);
seqno++; seqno++;
} }
try { try {
// send an empty packet to mark the end of the block // send an empty packet to mark the end of the block
sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks); sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
throttler);
out.flush(); out.flush();
} catch (IOException e) { //socket error } catch (IOException e) { //socket error
throw ioeToSocketException(e); throw ioeToSocketException(e);
@ -506,14 +588,39 @@ long sendBlock(DataOutputStream out, OutputStream baseStream,
} finally { } finally {
if (clientTraceFmt != null) { if (clientTraceFmt != null) {
final long endTime = System.nanoTime(); final long endTime = System.nanoTime();
ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime)); ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
initialOffset, endTime - startTime));
} }
close(); close();
} }
return totalRead; return totalRead;
} }
/**
* Write checksum header to the output stream
*/
private void writeChecksumHeader(DataOutputStream out) throws IOException {
try {
checksum.writeHeader(out);
if (chunkOffsetOK) {
out.writeLong(offset);
}
out.flush();
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
}
/**
* Write packet header into {@code pkt}
*/
private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear();
PacketHeader header = new PacketHeader(packetLen, offset, seqno,
(dataLen == 0), dataLen);
header.putInBuffer(pkt);
}
boolean didSendEntireByteRange() { boolean didSendEntireByteRange() {
return sentEntireByteRange; return sentEntireByteRange;
} }

View File

@ -2058,7 +2058,7 @@ public void run() {
out = new DataOutputStream(new BufferedOutputStream(baseStream, out = new DataOutputStream(new BufferedOutputStream(baseStream,
HdfsConstants.SMALL_BUFFER_SIZE)); HdfsConstants.SMALL_BUFFER_SIZE));
blockSender = new BlockSender(b, 0, b.getNumBytes(), blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, false, DataNode.this); false, false, false, DataNode.this, null);
DatanodeInfo srcNode = new DatanodeInfo(bpReg); DatanodeInfo srcNode = new DatanodeInfo(bpReg);
// //

View File

@ -597,7 +597,7 @@ public void copyBlock(final ExtendedBlock block,
try { try {
// check if the block exists or not // check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, false, blockSender = new BlockSender(block, 0, -1, false, false, false,
datanode); datanode, null);
// set up response stream // set up response stream
OutputStream baseStream = NetUtils.getOutputStream( OutputStream baseStream = NetUtils.getOutputStream(