diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 43c360fcb0c..d911107e6b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -21,7 +21,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
- HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
+ HADOOP-7524 Change RPC to allow multiple protocols including multuple
+ versions of the same protocol (sanjay Radia)
HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->
HdfsConstants. (Harsh J Chouraria via atm)
@@ -50,6 +51,8 @@ Trunk (unreleased changes)
HDFS-2355. Federation: enable using the same configuration file across
all the nodes in the cluster. (suresh)
+ HDFS-2371. Refactor BlockSender.java for better readability. (suresh)
+
BUG FIXES
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
index f6dc94d32e5..2a53b3dd78a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
@@ -404,7 +404,7 @@ class BlockPoolSliceScanner {
adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, false, true,
- datanode);
+ datanode, null);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index b9e3858f3e0..84b38b37e9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -41,191 +41,230 @@ import org.apache.hadoop.util.DataChecksum;
/**
* Reads a block from the disk and sends it to a recipient.
+ *
+ * Data sent from the BlockeSender in the following format:
+ *
Data format:
+ * +--------------------------------------------------+ + * | ChecksumHeader | Sequence of data PACKETS... | + * +--------------------------------------------------+ + *+ * ChecksumHeader format:
+ * +--------------------------------------------------+ + * | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM | + * +--------------------------------------------------+ + *+ * An empty packet is sent to mark the end of block and read completion. + * + * PACKET Contains a packet header, checksum and data. Amount of data + * carried is set by BUFFER_SIZE. + *
+ * +-----------------------------------------------------+ + * | 4 byte packet length (excluding packet header) | + * +-----------------------------------------------------+ + * | 8 byte offset in the block | 8 byte sequence number | + * +-----------------------------------------------------+ + * | 1 byte isLastPacketInBlock | + * +-----------------------------------------------------+ + * | 4 byte Length of actual data | + * +-----------------------------------------------------+ + * | x byte checksum data. x is defined below | + * +-----------------------------------------------------+ + * | actual data ...... | + * +-----------------------------------------------------+ + * + * Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM. + * A checksum is calculated for each chunk. + * + * x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM * + * CHECKSUM_SIZE + * + * CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32) + *+ * + * The client reads data until it receives a packet with + * "LastPacketInBlock" set to true or with a zero length. If there is + * no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: + *
+ * +------------------------------+ + * | 2 byte OP_STATUS_CHECKSUM_OK | + * +------------------------------+ + **/ class BlockSender implements java.io.Closeable { - public static final Log LOG = DataNode.LOG; + static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; - - private ExtendedBlock block; // the block to read from - - /** the replica to read from */ - private final Replica replica; - /** The visible length of a replica. */ - private final long replicaVisibleLength; - - private static final boolean is32Bit = System.getProperty("sun.arch.data.model").equals("32"); - - private InputStream blockIn; // data stream - private long blockInPosition = -1; // updated while using transferTo(). - private DataInputStream checksumIn; // checksum datastream - private DataChecksum checksum; // checksum stream - private long offset; // starting position to read - private long endOffset; // ending position - private int bytesPerChecksum; // chunk size - private int checksumSize; // checksum size - private boolean corruptChecksumOk; // if need to verify checksum - private boolean chunkOffsetOK; // if need to send chunk offset - private long seqno; // sequence number of packet - - private boolean transferToAllowed = true; - // set once entire requested byte range has been sent to the client - private boolean sentEntireByteRange; - private boolean verifyChecksum; //if true, check is verified while reading - private DataTransferThrottler throttler; - private final String clientTraceFmt; // format of client trace log message - + private static final boolean is32Bit = + System.getProperty("sun.arch.data.model").equals("32"); /** * Minimum buffer used while sending data to clients. Used only if * transferTo() is enabled. 64KB is not that large. It could be larger, but * not sure if there will be much more improvement. */ private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024; - private volatile ChunkChecksum lastChunkChecksum = null; - + private static final int TRANSFERTO_BUFFER_SIZE = Math.max( + HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO); - BlockSender(ExtendedBlock block, long startOffset, long length, - boolean corruptChecksumOk, boolean chunkOffsetOK, - boolean verifyChecksum, DataNode datanode) throws IOException { - this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK, - verifyChecksum, datanode, null); - } - + /** the block to read from */ + private final ExtendedBlock block; + /** the replica to read from */ + private final Replica replica; + /** The visible length of a replica. */ + private final long replicaVisibleLength; + /** Stream to read block data from */ + private InputStream blockIn; + /** updated while using transferTo() */ + private long blockInPosition = -1; + /** Stream to read checksum */ + private DataInputStream checksumIn; + /** Checksum utility */ + private final DataChecksum checksum; + /** Starting position to read */ + private long offset; + /** Position of last byte to read from block file */ + private final long endOffset; + /** Number of bytes in chunk used for computing checksum */ + private final int chunkSize; + /** Number bytes of checksum computed for a chunk */ + private final int checksumSize; + /** If true, failure to read checksum is ignored */ + private final boolean corruptChecksumOk; + /** true if chunk offset is needed to be sent in Checksum header */ + private final boolean chunkOffsetOK; + /** Sequence number of packet being sent */ + private long seqno; + /** Set to true if transferTo is allowed for sending data to the client */ + private final boolean transferToAllowed; + /** Set to true once entire requested byte range has been sent to the client */ + private boolean sentEntireByteRange; + /** When true, verify checksum while reading from checksum file */ + private final boolean verifyChecksum; + /** Format used to print client trace log messages */ + private final String clientTraceFmt; + private volatile ChunkChecksum lastChunkChecksum = null; + + /** + * Constructor + * + * @param block Block that is being read + * @param startOffset starting offset to read from + * @param length length of data to read + * @param corruptChecksumOk + * @param chunkOffsetOK need to send check offset in checksum header + * @param verifyChecksum verify checksum while reading the data + * @param datanode datanode from which the block is being read + * @param clientTraceFmt format string used to print client trace logs + * @throws IOException + */ BlockSender(ExtendedBlock block, long startOffset, long length, boolean corruptChecksumOk, boolean chunkOffsetOK, boolean verifyChecksum, DataNode datanode, String clientTraceFmt) throws IOException { try { this.block = block; + this.chunkOffsetOK = chunkOffsetOK; + this.corruptChecksumOk = corruptChecksumOk; + this.verifyChecksum = verifyChecksum; + this.clientTraceFmt = clientTraceFmt; + synchronized(datanode.data) { - this.replica = datanode.data.getReplica(block.getBlockPoolId(), - block.getBlockId()); - if (replica == null) { - throw new ReplicaNotFoundException(block); - } + this.replica = getReplica(block, datanode); this.replicaVisibleLength = replica.getVisibleLength(); } - long minEndOffset = startOffset + length; - // if this is a write in progress + // if there is a write in progress ChunkChecksum chunkChecksum = null; if (replica instanceof ReplicaBeingWritten) { - for (int i = 0; i < 30 && replica.getBytesOnDisk() < minEndOffset; i++) { - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - throw new IOException(ie); - } - } - - long currentBytesOnDisk = replica.getBytesOnDisk(); - - if (currentBytesOnDisk < minEndOffset) { - throw new IOException(String.format( - "need %d bytes, but only %d bytes available", - minEndOffset, - currentBytesOnDisk - )); - } - + long minEndOffset = startOffset + length; + waitForMinLength((ReplicaBeingWritten)replica, minEndOffset); ReplicaInPipeline rip = (ReplicaInPipeline) replica; chunkChecksum = rip.getLastChecksumAndDataLen(); } if (replica.getGenerationStamp() < block.getGenerationStamp()) { - throw new IOException( - "replica.getGenerationStamp() < block.getGenerationStamp(), block=" + throw new IOException("Replica gen stamp < block genstamp, block=" + block + ", replica=" + replica); } if (replicaVisibleLength < 0) { - throw new IOException("The replica is not readable, block=" + throw new IOException("Replica is not readable, block=" + block + ", replica=" + replica); } if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("block=" + block + ", replica=" + replica); } - - this.chunkOffsetOK = chunkOffsetOK; - this.corruptChecksumOk = corruptChecksumOk; - this.verifyChecksum = verifyChecksum; // transferToFully() fails on 32 bit platforms for block sizes >= 2GB, // use normal transfer in those cases this.transferToAllowed = datanode.transferToAllowed && - (!is32Bit || length < (long) Integer.MAX_VALUE); - this.clientTraceFmt = clientTraceFmt; + (!is32Bit || length <= Integer.MAX_VALUE); - if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) { + DataChecksum csum; + if (!corruptChecksumOk || datanode.data.metaFileExists(block)) { checksumIn = new DataInputStream(new BufferedInputStream(datanode.data .getMetaDataInputStream(block), HdfsConstants.IO_FILE_BUFFER_SIZE)); // read and handle the common header here. For now just a version - BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); - short version = header.getVersion(); - + BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); + short version = header.getVersion(); if (version != FSDataset.METADATA_VERSION) { LOG.warn("Wrong version (" + version + ") for metadata file for " + block + " ignoring ..."); } - checksum = header.getChecksum(); + csum = header.getChecksum(); } else { LOG.warn("Could not find metadata file for " + block); // This only decides the buffer size. Use BUFFER_SIZE? - checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, + csum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, 16 * 1024); } - /* If bytesPerChecksum is very large, then the metadata file - * is mostly corrupted. For now just truncate bytesPerchecksum to - * blockLength. - */ - bytesPerChecksum = checksum.getBytesPerChecksum(); - if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) { - checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(), + /* + * If chunkSize is very large, then the metadata file is mostly + * corrupted. For now just truncate bytesPerchecksum to blockLength. + */ + int size = csum.getBytesPerChecksum(); + if (size > 10*1024*1024 && size > replicaVisibleLength) { + csum = DataChecksum.newDataChecksum(csum.getChecksumType(), Math.max((int)replicaVisibleLength, 10*1024*1024)); - bytesPerChecksum = checksum.getBytesPerChecksum(); + size = csum.getBytesPerChecksum(); } + chunkSize = size; + checksum = csum; checksumSize = checksum.getChecksumSize(); - - if (length < 0) { - length = replicaVisibleLength; - } + length = length < 0 ? replicaVisibleLength : length; // end is either last byte on disk or the length for which we have a // checksum - if (chunkChecksum != null) { - endOffset = chunkChecksum.getDataLength(); - } else { - endOffset = replica.getBytesOnDisk(); - } - - if (startOffset < 0 || startOffset > endOffset - || (length + startOffset) > endOffset) { + long end = chunkChecksum != null ? chunkChecksum.getDataLength() + : replica.getBytesOnDisk(); + if (startOffset < 0 || startOffset > end + || (length + startOffset) > end) { String msg = " Offset " + startOffset + " and length " + length - + " don't match block " + block + " ( blockLen " + endOffset + " )"; + + " don't match block " + block + " ( blockLen " + end + " )"; LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) + ":sendBlock() : " + msg); throw new IOException(msg); } - offset = (startOffset - (startOffset % bytesPerChecksum)); + // Ensure read offset is position at the beginning of chunk + offset = startOffset - (startOffset % chunkSize); if (length >= 0) { - // Make sure endOffset points to end of a checksumed chunk. + // Ensure endOffset points to end of chunk. long tmpLen = startOffset + length; - if (tmpLen % bytesPerChecksum != 0) { - tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum); + if (tmpLen % chunkSize != 0) { + tmpLen += (chunkSize - tmpLen % chunkSize); } - if (tmpLen < endOffset) { + if (tmpLen < end) { // will use on-disk checksum here since the end is a stable chunk - endOffset = tmpLen; + end = tmpLen; } else if (chunkChecksum != null) { - //in last chunk which is changing. flag that we need to use in-memory - // checksum + // last chunk is changing. flag that we need to use in-memory checksum this.lastChunkChecksum = chunkChecksum; } } + endOffset = end; // seek to the right offsets if (offset > 0) { - long checksumSkip = (offset / bytesPerChecksum) * checksumSize; + long checksumSkip = (offset / chunkSize) * checksumSize; // note blockInStream is seeked when created below if (checksumSkip > 0) { // Should we use seek() for checksum file as well? @@ -237,7 +276,6 @@ class BlockSender implements java.io.Closeable { if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("replica=" + replica); } - blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset } catch (IOException ioe) { IOUtils.closeStream(this); @@ -251,19 +289,17 @@ class BlockSender implements java.io.Closeable { */ public void close() throws IOException { IOException ioe = null; - // close checksum file if(checksumIn!=null) { try { - checksumIn.close(); + checksumIn.close(); // close checksum file } catch (IOException e) { ioe = e; } checksumIn = null; - } - // close data file + } if(blockIn!=null) { try { - blockIn.close(); + blockIn.close(); // close data file } catch (IOException e) { ioe = e; } @@ -274,7 +310,41 @@ class BlockSender implements java.io.Closeable { throw ioe; } } - + + private static Replica getReplica(ExtendedBlock block, DataNode datanode) + throws ReplicaNotFoundException { + Replica replica = datanode.data.getReplica(block.getBlockPoolId(), + block.getBlockId()); + if (replica == null) { + throw new ReplicaNotFoundException(block); + } + return replica; + } + + /** + * Wait for rbw replica to reach the length + * @param rbw replica that is being written to + * @param len minimum length to reach + * @throws IOException on failing to reach the len in given wait time + */ + private static void waitForMinLength(ReplicaBeingWritten rbw, long len) + throws IOException { + // Wait for 3 seconds for rbw replica to reach the minimum length + for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + } + long bytesOnDisk = rbw.getBytesOnDisk(); + if (bytesOnDisk < len) { + throw new IOException( + String.format("Need %d bytes, but only %d bytes available", len, + bytesOnDisk)); + } + } + /** * Converts an IOExcpetion (not subclasses) to SocketException. * This is typically done to indicate to upper layers that the error @@ -296,54 +366,43 @@ class BlockSender implements java.io.Closeable { } /** - * Sends upto maxChunks chunks of data. - * - * When blockInPosition is >= 0, assumes 'out' is a - * {@link SocketOutputStream} and tries - * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to - * send data (and updates blockInPosition). + * @param datalen Length of data + * @return number of chunks for data of given size */ - private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) - throws IOException { - // Sends multiple chunks in one packet with a single write(). + private int numberOfChunks(long datalen) { + return (int) ((datalen + chunkSize - 1)/chunkSize); + } + + /** + * Sends a packet with up to maxChunks chunks of data. + * + * @param pkt buffer used for writing packet data + * @param maxChunks maximum number of chunks to send + * @param out stream to send data to + * @param transferTo use transferTo to send data + * @param throttler used for throttling data transfer bandwidth + */ + private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out, + boolean transferTo, DataTransferThrottler throttler) throws IOException { + int dataLen = (int) Math.min(endOffset - offset, + (chunkSize * (long) maxChunks)); + + int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet + int checksumDataLen = numChunks * checksumSize; + int packetLen = dataLen + checksumDataLen + 4; + boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0; - int len = (int) Math.min(endOffset - offset, - (((long) bytesPerChecksum) * ((long) maxChunks))); - int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum; - int packetLen = len + numChunks*checksumSize + 4; - boolean lastDataPacket = offset + len == endOffset && len > 0; - pkt.clear(); - - - PacketHeader header = new PacketHeader( - packetLen, offset, seqno, (len == 0), len); - header.putInBuffer(pkt); + writePacketHeader(pkt, dataLen, packetLen); int checksumOff = pkt.position(); - int checksumLen = numChunks * checksumSize; byte[] buf = pkt.array(); if (checksumSize > 0 && checksumIn != null) { - try { - checksumIn.readFully(buf, checksumOff, checksumLen); - } catch (IOException e) { - LOG.warn(" Could not read or failed to veirfy checksum for data" - + " at offset " + offset + " for block " + block, e); - IOUtils.closeStream(checksumIn); - checksumIn = null; - if (corruptChecksumOk) { - if (checksumOff < checksumLen) { - // Just fill the array with zeros. - Arrays.fill(buf, checksumOff, checksumLen, (byte) 0); - } - } else { - throw e; - } - } + readChecksum(buf, checksumOff, checksumDataLen); // write in progress that we need to use to get last checksum if (lastDataPacket && lastChunkChecksum != null) { - int start = checksumOff + checksumLen - checksumSize; + int start = checksumOff + checksumDataLen - checksumSize; byte[] updatedChecksum = lastChunkChecksum.getChecksum(); if (updatedChecksum != null) { @@ -352,52 +411,28 @@ class BlockSender implements java.io.Closeable { } } - int dataOff = checksumOff + checksumLen; - - if (blockInPosition < 0) { - //normal transfer - IOUtils.readFully(blockIn, buf, dataOff, len); + int dataOff = checksumOff + checksumDataLen; + if (!transferTo) { // normal transfer + IOUtils.readFully(blockIn, buf, dataOff, dataLen); if (verifyChecksum) { - int dOff = dataOff; - int cOff = checksumOff; - int dLeft = len; - - for (int i=0; i