From e90a5b40430cc1fbce075d34b31e3cc05fd9831f Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Thu, 29 Sep 2011 04:40:15 +0000 Subject: [PATCH] HDFS-2371. Refactor BlockSender.java for better readability. Contributed by Suresh Srinivas. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1177161 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 5 +- .../datanode/BlockPoolSliceScanner.java | 2 +- .../hdfs/server/datanode/BlockSender.java | 561 +++++++++++------- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../hdfs/server/datanode/DataXceiver.java | 2 +- 5 files changed, 341 insertions(+), 231 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 43c360fcb0c..d911107e6b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -21,7 +21,8 @@ Trunk (unreleased changes) IMPROVEMENTS - HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) + HADOOP-7524 Change RPC to allow multiple protocols including multuple + versions of the same protocol (sanjay Radia) HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants -> HdfsConstants. (Harsh J Chouraria via atm) @@ -50,6 +51,8 @@ Trunk (unreleased changes) HDFS-2355. Federation: enable using the same configuration file across all the nodes in the cluster. (suresh) + HDFS-2371. Refactor BlockSender.java for better readability. (suresh) + BUG FIXES HDFS-2287. TestParallelRead has a small off-by-one bug. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index f6dc94d32e5..2a53b3dd78a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -404,7 +404,7 @@ private void verifyBlock(ExtendedBlock block) { adjustThrottler(); blockSender = new BlockSender(block, 0, -1, false, false, true, - datanode); + datanode, null); DataOutputStream out = new DataOutputStream(new IOUtils.NullOutputStream()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index b9e3858f3e0..84b38b37e9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -41,191 +41,230 @@ /** * Reads a block from the disk and sends it to a recipient. + * + * Data sent from the BlockeSender in the following format: + *
Data format:
+ *    +--------------------------------------------------+
+ *    | ChecksumHeader | Sequence of data PACKETS...     |
+ *    +--------------------------------------------------+ 
+ * 
+ * ChecksumHeader format:
+ *    +--------------------------------------------------+
+ *    | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
+ *    +--------------------------------------------------+ 
+ * 
+ * An empty packet is sent to mark the end of block and read completion. + * + * PACKET Contains a packet header, checksum and data. Amount of data + * carried is set by BUFFER_SIZE. + *
+ *    +-----------------------------------------------------+
+ *    | 4 byte packet length (excluding packet header)      |
+ *    +-----------------------------------------------------+
+ *    | 8 byte offset in the block | 8 byte sequence number |
+ *    +-----------------------------------------------------+
+ *    | 1 byte isLastPacketInBlock                          |
+ *    +-----------------------------------------------------+
+ *    | 4 byte Length of actual data                        |
+ *    +-----------------------------------------------------+
+ *    | x byte checksum data. x is defined below            |
+ *    +-----------------------------------------------------+
+ *    | actual data ......                                  |
+ *    +-----------------------------------------------------+
+ *    
+ *    Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
+ *    A checksum is calculated for each chunk.
+ *    
+ *    x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
+ *        CHECKSUM_SIZE
+ *        
+ *    CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) 
+ *    
+ * + * The client reads data until it receives a packet with + * "LastPacketInBlock" set to true or with a zero length. If there is + * no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: + *
+ *    +------------------------------+
+ *    | 2 byte OP_STATUS_CHECKSUM_OK |
+ *    +------------------------------+
+ *  
*/ class BlockSender implements java.io.Closeable { - public static final Log LOG = DataNode.LOG; + static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; - - private ExtendedBlock block; // the block to read from - - /** the replica to read from */ - private final Replica replica; - /** The visible length of a replica. */ - private final long replicaVisibleLength; - - private static final boolean is32Bit = System.getProperty("sun.arch.data.model").equals("32"); - - private InputStream blockIn; // data stream - private long blockInPosition = -1; // updated while using transferTo(). - private DataInputStream checksumIn; // checksum datastream - private DataChecksum checksum; // checksum stream - private long offset; // starting position to read - private long endOffset; // ending position - private int bytesPerChecksum; // chunk size - private int checksumSize; // checksum size - private boolean corruptChecksumOk; // if need to verify checksum - private boolean chunkOffsetOK; // if need to send chunk offset - private long seqno; // sequence number of packet - - private boolean transferToAllowed = true; - // set once entire requested byte range has been sent to the client - private boolean sentEntireByteRange; - private boolean verifyChecksum; //if true, check is verified while reading - private DataTransferThrottler throttler; - private final String clientTraceFmt; // format of client trace log message - + private static final boolean is32Bit = + System.getProperty("sun.arch.data.model").equals("32"); /** * Minimum buffer used while sending data to clients. Used only if * transferTo() is enabled. 64KB is not that large. It could be larger, but * not sure if there will be much more improvement. */ private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024; - private volatile ChunkChecksum lastChunkChecksum = null; - + private static final int TRANSFERTO_BUFFER_SIZE = Math.max( + HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO); - BlockSender(ExtendedBlock block, long startOffset, long length, - boolean corruptChecksumOk, boolean chunkOffsetOK, - boolean verifyChecksum, DataNode datanode) throws IOException { - this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK, - verifyChecksum, datanode, null); - } - + /** the block to read from */ + private final ExtendedBlock block; + /** the replica to read from */ + private final Replica replica; + /** The visible length of a replica. */ + private final long replicaVisibleLength; + /** Stream to read block data from */ + private InputStream blockIn; + /** updated while using transferTo() */ + private long blockInPosition = -1; + /** Stream to read checksum */ + private DataInputStream checksumIn; + /** Checksum utility */ + private final DataChecksum checksum; + /** Starting position to read */ + private long offset; + /** Position of last byte to read from block file */ + private final long endOffset; + /** Number of bytes in chunk used for computing checksum */ + private final int chunkSize; + /** Number bytes of checksum computed for a chunk */ + private final int checksumSize; + /** If true, failure to read checksum is ignored */ + private final boolean corruptChecksumOk; + /** true if chunk offset is needed to be sent in Checksum header */ + private final boolean chunkOffsetOK; + /** Sequence number of packet being sent */ + private long seqno; + /** Set to true if transferTo is allowed for sending data to the client */ + private final boolean transferToAllowed; + /** Set to true once entire requested byte range has been sent to the client */ + private boolean sentEntireByteRange; + /** When true, verify checksum while reading from checksum file */ + private final boolean verifyChecksum; + /** Format used to print client trace log messages */ + private final String clientTraceFmt; + private volatile ChunkChecksum lastChunkChecksum = null; + + /** + * Constructor + * + * @param block Block that is being read + * @param startOffset starting offset to read from + * @param length length of data to read + * @param corruptChecksumOk + * @param chunkOffsetOK need to send check offset in checksum header + * @param verifyChecksum verify checksum while reading the data + * @param datanode datanode from which the block is being read + * @param clientTraceFmt format string used to print client trace logs + * @throws IOException + */ BlockSender(ExtendedBlock block, long startOffset, long length, boolean corruptChecksumOk, boolean chunkOffsetOK, boolean verifyChecksum, DataNode datanode, String clientTraceFmt) throws IOException { try { this.block = block; + this.chunkOffsetOK = chunkOffsetOK; + this.corruptChecksumOk = corruptChecksumOk; + this.verifyChecksum = verifyChecksum; + this.clientTraceFmt = clientTraceFmt; + synchronized(datanode.data) { - this.replica = datanode.data.getReplica(block.getBlockPoolId(), - block.getBlockId()); - if (replica == null) { - throw new ReplicaNotFoundException(block); - } + this.replica = getReplica(block, datanode); this.replicaVisibleLength = replica.getVisibleLength(); } - long minEndOffset = startOffset + length; - // if this is a write in progress + // if there is a write in progress ChunkChecksum chunkChecksum = null; if (replica instanceof ReplicaBeingWritten) { - for (int i = 0; i < 30 && replica.getBytesOnDisk() < minEndOffset; i++) { - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - throw new IOException(ie); - } - } - - long currentBytesOnDisk = replica.getBytesOnDisk(); - - if (currentBytesOnDisk < minEndOffset) { - throw new IOException(String.format( - "need %d bytes, but only %d bytes available", - minEndOffset, - currentBytesOnDisk - )); - } - + long minEndOffset = startOffset + length; + waitForMinLength((ReplicaBeingWritten)replica, minEndOffset); ReplicaInPipeline rip = (ReplicaInPipeline) replica; chunkChecksum = rip.getLastChecksumAndDataLen(); } if (replica.getGenerationStamp() < block.getGenerationStamp()) { - throw new IOException( - "replica.getGenerationStamp() < block.getGenerationStamp(), block=" + throw new IOException("Replica gen stamp < block genstamp, block=" + block + ", replica=" + replica); } if (replicaVisibleLength < 0) { - throw new IOException("The replica is not readable, block=" + throw new IOException("Replica is not readable, block=" + block + ", replica=" + replica); } if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("block=" + block + ", replica=" + replica); } - - this.chunkOffsetOK = chunkOffsetOK; - this.corruptChecksumOk = corruptChecksumOk; - this.verifyChecksum = verifyChecksum; // transferToFully() fails on 32 bit platforms for block sizes >= 2GB, // use normal transfer in those cases this.transferToAllowed = datanode.transferToAllowed && - (!is32Bit || length < (long) Integer.MAX_VALUE); - this.clientTraceFmt = clientTraceFmt; + (!is32Bit || length <= Integer.MAX_VALUE); - if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) { + DataChecksum csum; + if (!corruptChecksumOk || datanode.data.metaFileExists(block)) { checksumIn = new DataInputStream(new BufferedInputStream(datanode.data .getMetaDataInputStream(block), HdfsConstants.IO_FILE_BUFFER_SIZE)); // read and handle the common header here. For now just a version - BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); - short version = header.getVersion(); - + BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); + short version = header.getVersion(); if (version != FSDataset.METADATA_VERSION) { LOG.warn("Wrong version (" + version + ") for metadata file for " + block + " ignoring ..."); } - checksum = header.getChecksum(); + csum = header.getChecksum(); } else { LOG.warn("Could not find metadata file for " + block); // This only decides the buffer size. Use BUFFER_SIZE? - checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, + csum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, 16 * 1024); } - /* If bytesPerChecksum is very large, then the metadata file - * is mostly corrupted. For now just truncate bytesPerchecksum to - * blockLength. - */ - bytesPerChecksum = checksum.getBytesPerChecksum(); - if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) { - checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(), + /* + * If chunkSize is very large, then the metadata file is mostly + * corrupted. For now just truncate bytesPerchecksum to blockLength. + */ + int size = csum.getBytesPerChecksum(); + if (size > 10*1024*1024 && size > replicaVisibleLength) { + csum = DataChecksum.newDataChecksum(csum.getChecksumType(), Math.max((int)replicaVisibleLength, 10*1024*1024)); - bytesPerChecksum = checksum.getBytesPerChecksum(); + size = csum.getBytesPerChecksum(); } + chunkSize = size; + checksum = csum; checksumSize = checksum.getChecksumSize(); - - if (length < 0) { - length = replicaVisibleLength; - } + length = length < 0 ? replicaVisibleLength : length; // end is either last byte on disk or the length for which we have a // checksum - if (chunkChecksum != null) { - endOffset = chunkChecksum.getDataLength(); - } else { - endOffset = replica.getBytesOnDisk(); - } - - if (startOffset < 0 || startOffset > endOffset - || (length + startOffset) > endOffset) { + long end = chunkChecksum != null ? chunkChecksum.getDataLength() + : replica.getBytesOnDisk(); + if (startOffset < 0 || startOffset > end + || (length + startOffset) > end) { String msg = " Offset " + startOffset + " and length " + length - + " don't match block " + block + " ( blockLen " + endOffset + " )"; + + " don't match block " + block + " ( blockLen " + end + " )"; LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) + ":sendBlock() : " + msg); throw new IOException(msg); } - offset = (startOffset - (startOffset % bytesPerChecksum)); + // Ensure read offset is position at the beginning of chunk + offset = startOffset - (startOffset % chunkSize); if (length >= 0) { - // Make sure endOffset points to end of a checksumed chunk. + // Ensure endOffset points to end of chunk. long tmpLen = startOffset + length; - if (tmpLen % bytesPerChecksum != 0) { - tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum); + if (tmpLen % chunkSize != 0) { + tmpLen += (chunkSize - tmpLen % chunkSize); } - if (tmpLen < endOffset) { + if (tmpLen < end) { // will use on-disk checksum here since the end is a stable chunk - endOffset = tmpLen; + end = tmpLen; } else if (chunkChecksum != null) { - //in last chunk which is changing. flag that we need to use in-memory - // checksum + // last chunk is changing. flag that we need to use in-memory checksum this.lastChunkChecksum = chunkChecksum; } } + endOffset = end; // seek to the right offsets if (offset > 0) { - long checksumSkip = (offset / bytesPerChecksum) * checksumSize; + long checksumSkip = (offset / chunkSize) * checksumSize; // note blockInStream is seeked when created below if (checksumSkip > 0) { // Should we use seek() for checksum file as well? @@ -237,7 +276,6 @@ class BlockSender implements java.io.Closeable { if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("replica=" + replica); } - blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset } catch (IOException ioe) { IOUtils.closeStream(this); @@ -251,19 +289,17 @@ class BlockSender implements java.io.Closeable { */ public void close() throws IOException { IOException ioe = null; - // close checksum file if(checksumIn!=null) { try { - checksumIn.close(); + checksumIn.close(); // close checksum file } catch (IOException e) { ioe = e; } checksumIn = null; - } - // close data file + } if(blockIn!=null) { try { - blockIn.close(); + blockIn.close(); // close data file } catch (IOException e) { ioe = e; } @@ -274,7 +310,41 @@ public void close() throws IOException { throw ioe; } } - + + private static Replica getReplica(ExtendedBlock block, DataNode datanode) + throws ReplicaNotFoundException { + Replica replica = datanode.data.getReplica(block.getBlockPoolId(), + block.getBlockId()); + if (replica == null) { + throw new ReplicaNotFoundException(block); + } + return replica; + } + + /** + * Wait for rbw replica to reach the length + * @param rbw replica that is being written to + * @param len minimum length to reach + * @throws IOException on failing to reach the len in given wait time + */ + private static void waitForMinLength(ReplicaBeingWritten rbw, long len) + throws IOException { + // Wait for 3 seconds for rbw replica to reach the minimum length + for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } + long bytesOnDisk = rbw.getBytesOnDisk(); + if (bytesOnDisk < len) { + throw new IOException( + String.format("Need %d bytes, but only %d bytes available", len, + bytesOnDisk)); + } + } + /** * Converts an IOExcpetion (not subclasses) to SocketException. * This is typically done to indicate to upper layers that the error @@ -296,54 +366,43 @@ private static IOException ioeToSocketException(IOException ioe) { } /** - * Sends upto maxChunks chunks of data. - * - * When blockInPosition is >= 0, assumes 'out' is a - * {@link SocketOutputStream} and tries - * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to - * send data (and updates blockInPosition). + * @param datalen Length of data + * @return number of chunks for data of given size */ - private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) - throws IOException { - // Sends multiple chunks in one packet with a single write(). + private int numberOfChunks(long datalen) { + return (int) ((datalen + chunkSize - 1)/chunkSize); + } + + /** + * Sends a packet with up to maxChunks chunks of data. + * + * @param pkt buffer used for writing packet data + * @param maxChunks maximum number of chunks to send + * @param out stream to send data to + * @param transferTo use transferTo to send data + * @param throttler used for throttling data transfer bandwidth + */ + private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, + boolean transferTo, DataTransferThrottler throttler) throws IOException { + int dataLen = (int) Math.min(endOffset - offset, + (chunkSize * (long) maxChunks)); + + int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet + int checksumDataLen = numChunks * checksumSize; + int packetLen = dataLen + checksumDataLen + 4; + boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0; - int len = (int) Math.min(endOffset - offset, - (((long) bytesPerChecksum) * ((long) maxChunks))); - int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum; - int packetLen = len + numChunks*checksumSize + 4; - boolean lastDataPacket = offset + len == endOffset && len > 0; - pkt.clear(); - - - PacketHeader header = new PacketHeader( - packetLen, offset, seqno, (len == 0), len); - header.putInBuffer(pkt); + writePacketHeader(pkt, dataLen, packetLen); int checksumOff = pkt.position(); - int checksumLen = numChunks * checksumSize; byte[] buf = pkt.array(); if (checksumSize > 0 && checksumIn != null) { - try { - checksumIn.readFully(buf, checksumOff, checksumLen); - } catch (IOException e) { - LOG.warn(" Could not read or failed to veirfy checksum for data" - + " at offset " + offset + " for block " + block, e); - IOUtils.closeStream(checksumIn); - checksumIn = null; - if (corruptChecksumOk) { - if (checksumOff < checksumLen) { - // Just fill the array with zeros. - Arrays.fill(buf, checksumOff, checksumLen, (byte) 0); - } - } else { - throw e; - } - } + readChecksum(buf, checksumOff, checksumDataLen); // write in progress that we need to use to get last checksum if (lastDataPacket && lastChunkChecksum != null) { - int start = checksumOff + checksumLen - checksumSize; + int start = checksumOff + checksumDataLen - checksumSize; byte[] updatedChecksum = lastChunkChecksum.getChecksum(); if (updatedChecksum != null) { @@ -352,52 +411,28 @@ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) } } - int dataOff = checksumOff + checksumLen; - - if (blockInPosition < 0) { - //normal transfer - IOUtils.readFully(blockIn, buf, dataOff, len); + int dataOff = checksumOff + checksumDataLen; + if (!transferTo) { // normal transfer + IOUtils.readFully(blockIn, buf, dataOff, dataLen); if (verifyChecksum) { - int dOff = dataOff; - int cOff = checksumOff; - int dLeft = len; - - for (int i=0; i= 0) { - //use transferTo(). Checks on out and blockIn are already done. - + if (transferTo) { SocketOutputStream sockOut = (SocketOutputStream)out; - //first write the packet - sockOut.write(buf, 0, dataOff); + sockOut.write(buf, 0, dataOff); // First write checksum + // no need to flush. since we know out is not a buffered stream. - sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), - blockInPosition, len); - - blockInPosition += len; - } else { + blockInPosition, dataLen); + blockInPosition += dataLen; + } else { // normal transfer - out.write(buf, 0, dataOff + len); + out.write(buf, 0, dataOff + dataLen); } - } catch (IOException e) { /* Exception while writing to the client. Connection closure from * the other end is mostly the case and we do not care much about @@ -419,9 +454,72 @@ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) throttler.throttle(packetLen); } - return len; + return dataLen; } + + /** + * Read checksum into given buffer + * @param buf buffer to read the checksum into + * @param checksumOffset offset at which to write the checksum into buf + * @param checksumLen length of checksum to write + * @throws IOException on error + */ + private void readChecksum(byte[] buf, final int checksumOffset, + final int checksumLen) throws IOException { + if (checksumSize <= 0 && checksumIn == null) { + return; + } + try { + checksumIn.readFully(buf, checksumOffset, checksumLen); + } catch (IOException e) { + LOG.warn(" Could not read or failed to veirfy checksum for data" + + " at offset " + offset + " for block " + block, e); + IOUtils.closeStream(checksumIn); + checksumIn = null; + if (corruptChecksumOk) { + if (checksumOffset < checksumLen) { + // Just fill the array with zeros. + Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0); + } + } else { + throw e; + } + } + } + + /** + * Compute checksum for chunks and verify the checksum that is read from + * the metadata file is correct. + * + * @param buf buffer that has checksum and data + * @param dataOffset position where data is written in the buf + * @param datalen length of data + * @param numChunks number of chunks corresponding to data + * @param checksumOffset offset where checksum is written in the buf + * @throws ChecksumException on failed checksum verification + */ + public void verifyChecksum(final byte[] buf, final int dataOffset, + final int datalen, final int numChunks, final int checksumOffset) + throws ChecksumException { + int dOff = dataOffset; + int cOff = checksumOffset; + int dLeft = datalen; + for (int i = 0; i < numChunks; i++) { + checksum.reset(); + int dLen = Math.min(dLeft, chunkSize); + checksum.update(buf, dOff, dLen); + if (!checksum.compare(buf, cOff)) { + long failedPos = offset + datalen - dLeft; + throw new ChecksumException("Checksum failed at " + failedPos, + failedPos); + } + dLeft -= dLen; + dOff += dLen; + cOff += checksumSize; + } + } + /** * sendBlock() is used to read block and its metadata and stream the data to * either a client or to another datanode. @@ -433,70 +531,54 @@ private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) * {@link SocketOutputStream#transferToFully(FileChannel, * long, int)}. * @param throttler for sending data. - * @return total bytes reads, including crc. + * @return total bytes read, including checksum data. */ long sendBlock(DataOutputStream out, OutputStream baseStream, DataTransferThrottler throttler) throws IOException { - if( out == null ) { + if (out == null) { throw new IOException( "out stream is null" ); } - this.throttler = throttler; - - long initialOffset = offset; + final long initialOffset = offset; long totalRead = 0; OutputStream streamForSendChunks = out; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; try { - try { - checksum.writeHeader(out); - if ( chunkOffsetOK ) { - out.writeLong( offset ); - } - out.flush(); - } catch (IOException e) { //socket error - throw ioeToSocketException(e); - } + writeChecksumHeader(out); int maxChunksPerPacket; int pktSize = PacketHeader.PKT_HEADER_LEN; - - if (transferToAllowed && !verifyChecksum && - baseStream instanceof SocketOutputStream && - blockIn instanceof FileInputStream) { - + boolean transferTo = transferToAllowed && !verifyChecksum + && baseStream instanceof SocketOutputStream + && blockIn instanceof FileInputStream; + if (transferTo) { FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); - - // blockInPosition also indicates sendChunks() uses transferTo. blockInPosition = fileChannel.position(); streamForSendChunks = baseStream; + maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); - // assure a mininum buffer size. - maxChunksPerPacket = (Math.max(HdfsConstants.IO_FILE_BUFFER_SIZE, - MIN_BUFFER_WITH_TRANSFERTO) - + bytesPerChecksum - 1)/bytesPerChecksum; - - // allocate smaller buffer while using transferTo(). + // Smaller packet size to only hold checksum when doing transferTo pktSize += checksumSize * maxChunksPerPacket; } else { - maxChunksPerPacket = Math.max(1, (HdfsConstants.IO_FILE_BUFFER_SIZE - + bytesPerChecksum - 1) / bytesPerChecksum); - pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket; + maxChunksPerPacket = Math.max(1, + numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE)); + // Packet size includes both checksum and data + pktSize += (chunkSize + checksumSize) * maxChunksPerPacket; } ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); while (endOffset > offset) { - long len = sendChunks(pktBuf, maxChunksPerPacket, - streamForSendChunks); + long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, + transferTo, throttler); offset += len; - totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum* - checksumSize); + totalRead += len + (numberOfChunks(len) * checksumSize); seqno++; } try { // send an empty packet to mark the end of the block - sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks); + sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, + throttler); out.flush(); } catch (IOException e) { //socket error throw ioeToSocketException(e); @@ -506,14 +588,39 @@ long sendBlock(DataOutputStream out, OutputStream baseStream, } finally { if (clientTraceFmt != null) { final long endTime = System.nanoTime(); - ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime)); + ClientTraceLog.info(String.format(clientTraceFmt, totalRead, + initialOffset, endTime - startTime)); } close(); } - return totalRead; } + /** + * Write checksum header to the output stream + */ + private void writeChecksumHeader(DataOutputStream out) throws IOException { + try { + checksum.writeHeader(out); + if (chunkOffsetOK) { + out.writeLong(offset); + } + out.flush(); + } catch (IOException e) { //socket error + throw ioeToSocketException(e); + } + } + + /** + * Write packet header into {@code pkt} + */ + private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) { + pkt.clear(); + PacketHeader header = new PacketHeader(packetLen, offset, seqno, + (dataLen == 0), dataLen); + header.putInBuffer(pkt); + } + boolean didSendEntireByteRange() { return sentEntireByteRange; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index b6ec0c05b4d..d4f5bc19f75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -2058,7 +2058,7 @@ public void run() { out = new DataOutputStream(new BufferedOutputStream(baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); blockSender = new BlockSender(b, 0, b.getNumBytes(), - false, false, false, DataNode.this); + false, false, false, DataNode.this, null); DatanodeInfo srcNode = new DatanodeInfo(bpReg); // diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 8d7d95f8aa3..fdcdc18a341 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -597,7 +597,7 @@ public void copyBlock(final ExtendedBlock block, try { // check if the block exists or not blockSender = new BlockSender(block, 0, -1, false, false, false, - datanode); + datanode, null); // set up response stream OutputStream baseStream = NetUtils.getOutputStream(