svn merge -c 1177161 from trunk for HDFS-2371.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1189981 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
58063c520f
commit
2f6f7e36eb
|
@ -742,6 +742,8 @@ Release 0.23.0 - Unreleased
|
||||||
HDFS-2355. Federation: enable using the same configuration file across
|
HDFS-2355. Federation: enable using the same configuration file across
|
||||||
all the nodes in the cluster. (suresh)
|
all the nodes in the cluster. (suresh)
|
||||||
|
|
||||||
|
HDFS-2371. Refactor BlockSender.java for better readability. (suresh)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
||||||
|
|
|
@ -404,7 +404,7 @@ class BlockPoolSliceScanner {
|
||||||
adjustThrottler();
|
adjustThrottler();
|
||||||
|
|
||||||
blockSender = new BlockSender(block, 0, -1, false, false, true,
|
blockSender = new BlockSender(block, 0, -1, false, false, true,
|
||||||
datanode);
|
datanode, null);
|
||||||
|
|
||||||
DataOutputStream out =
|
DataOutputStream out =
|
||||||
new DataOutputStream(new IOUtils.NullOutputStream());
|
new DataOutputStream(new IOUtils.NullOutputStream());
|
||||||
|
|
|
@ -41,191 +41,230 @@ import org.apache.hadoop.util.DataChecksum;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads a block from the disk and sends it to a recipient.
|
* Reads a block from the disk and sends it to a recipient.
|
||||||
|
*
|
||||||
|
* Data sent from the BlockeSender in the following format:
|
||||||
|
* <br><b>Data format:</b> <pre>
|
||||||
|
* +--------------------------------------------------+
|
||||||
|
* | ChecksumHeader | Sequence of data PACKETS... |
|
||||||
|
* +--------------------------------------------------+
|
||||||
|
* </pre>
|
||||||
|
* <b>ChecksumHeader format:</b> <pre>
|
||||||
|
* +--------------------------------------------------+
|
||||||
|
* | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
|
||||||
|
* +--------------------------------------------------+
|
||||||
|
* </pre>
|
||||||
|
* An empty packet is sent to mark the end of block and read completion.
|
||||||
|
*
|
||||||
|
* PACKET Contains a packet header, checksum and data. Amount of data
|
||||||
|
* carried is set by BUFFER_SIZE.
|
||||||
|
* <pre>
|
||||||
|
* +-----------------------------------------------------+
|
||||||
|
* | 4 byte packet length (excluding packet header) |
|
||||||
|
* +-----------------------------------------------------+
|
||||||
|
* | 8 byte offset in the block | 8 byte sequence number |
|
||||||
|
* +-----------------------------------------------------+
|
||||||
|
* | 1 byte isLastPacketInBlock |
|
||||||
|
* +-----------------------------------------------------+
|
||||||
|
* | 4 byte Length of actual data |
|
||||||
|
* +-----------------------------------------------------+
|
||||||
|
* | x byte checksum data. x is defined below |
|
||||||
|
* +-----------------------------------------------------+
|
||||||
|
* | actual data ...... |
|
||||||
|
* +-----------------------------------------------------+
|
||||||
|
*
|
||||||
|
* Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
|
||||||
|
* A checksum is calculated for each chunk.
|
||||||
|
*
|
||||||
|
* x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
|
||||||
|
* CHECKSUM_SIZE
|
||||||
|
*
|
||||||
|
* CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* The client reads data until it receives a packet with
|
||||||
|
* "LastPacketInBlock" set to true or with a zero length. If there is
|
||||||
|
* no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
|
||||||
|
* <pre>
|
||||||
|
* +------------------------------+
|
||||||
|
* | 2 byte OP_STATUS_CHECKSUM_OK |
|
||||||
|
* +------------------------------+
|
||||||
|
* </pre>
|
||||||
*/
|
*/
|
||||||
class BlockSender implements java.io.Closeable {
|
class BlockSender implements java.io.Closeable {
|
||||||
public static final Log LOG = DataNode.LOG;
|
static final Log LOG = DataNode.LOG;
|
||||||
static final Log ClientTraceLog = DataNode.ClientTraceLog;
|
static final Log ClientTraceLog = DataNode.ClientTraceLog;
|
||||||
|
private static final boolean is32Bit =
|
||||||
private ExtendedBlock block; // the block to read from
|
System.getProperty("sun.arch.data.model").equals("32");
|
||||||
|
|
||||||
/** the replica to read from */
|
|
||||||
private final Replica replica;
|
|
||||||
/** The visible length of a replica. */
|
|
||||||
private final long replicaVisibleLength;
|
|
||||||
|
|
||||||
private static final boolean is32Bit = System.getProperty("sun.arch.data.model").equals("32");
|
|
||||||
|
|
||||||
private InputStream blockIn; // data stream
|
|
||||||
private long blockInPosition = -1; // updated while using transferTo().
|
|
||||||
private DataInputStream checksumIn; // checksum datastream
|
|
||||||
private DataChecksum checksum; // checksum stream
|
|
||||||
private long offset; // starting position to read
|
|
||||||
private long endOffset; // ending position
|
|
||||||
private int bytesPerChecksum; // chunk size
|
|
||||||
private int checksumSize; // checksum size
|
|
||||||
private boolean corruptChecksumOk; // if need to verify checksum
|
|
||||||
private boolean chunkOffsetOK; // if need to send chunk offset
|
|
||||||
private long seqno; // sequence number of packet
|
|
||||||
|
|
||||||
private boolean transferToAllowed = true;
|
|
||||||
// set once entire requested byte range has been sent to the client
|
|
||||||
private boolean sentEntireByteRange;
|
|
||||||
private boolean verifyChecksum; //if true, check is verified while reading
|
|
||||||
private DataTransferThrottler throttler;
|
|
||||||
private final String clientTraceFmt; // format of client trace log message
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Minimum buffer used while sending data to clients. Used only if
|
* Minimum buffer used while sending data to clients. Used only if
|
||||||
* transferTo() is enabled. 64KB is not that large. It could be larger, but
|
* transferTo() is enabled. 64KB is not that large. It could be larger, but
|
||||||
* not sure if there will be much more improvement.
|
* not sure if there will be much more improvement.
|
||||||
*/
|
*/
|
||||||
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
|
private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
|
||||||
|
private static final int TRANSFERTO_BUFFER_SIZE = Math.max(
|
||||||
|
HdfsConstants.IO_FILE_BUFFER_SIZE, MIN_BUFFER_WITH_TRANSFERTO);
|
||||||
|
|
||||||
|
/** the block to read from */
|
||||||
|
private final ExtendedBlock block;
|
||||||
|
/** the replica to read from */
|
||||||
|
private final Replica replica;
|
||||||
|
/** The visible length of a replica. */
|
||||||
|
private final long replicaVisibleLength;
|
||||||
|
/** Stream to read block data from */
|
||||||
|
private InputStream blockIn;
|
||||||
|
/** updated while using transferTo() */
|
||||||
|
private long blockInPosition = -1;
|
||||||
|
/** Stream to read checksum */
|
||||||
|
private DataInputStream checksumIn;
|
||||||
|
/** Checksum utility */
|
||||||
|
private final DataChecksum checksum;
|
||||||
|
/** Starting position to read */
|
||||||
|
private long offset;
|
||||||
|
/** Position of last byte to read from block file */
|
||||||
|
private final long endOffset;
|
||||||
|
/** Number of bytes in chunk used for computing checksum */
|
||||||
|
private final int chunkSize;
|
||||||
|
/** Number bytes of checksum computed for a chunk */
|
||||||
|
private final int checksumSize;
|
||||||
|
/** If true, failure to read checksum is ignored */
|
||||||
|
private final boolean corruptChecksumOk;
|
||||||
|
/** true if chunk offset is needed to be sent in Checksum header */
|
||||||
|
private final boolean chunkOffsetOK;
|
||||||
|
/** Sequence number of packet being sent */
|
||||||
|
private long seqno;
|
||||||
|
/** Set to true if transferTo is allowed for sending data to the client */
|
||||||
|
private final boolean transferToAllowed;
|
||||||
|
/** Set to true once entire requested byte range has been sent to the client */
|
||||||
|
private boolean sentEntireByteRange;
|
||||||
|
/** When true, verify checksum while reading from checksum file */
|
||||||
|
private final boolean verifyChecksum;
|
||||||
|
/** Format used to print client trace log messages */
|
||||||
|
private final String clientTraceFmt;
|
||||||
private volatile ChunkChecksum lastChunkChecksum = null;
|
private volatile ChunkChecksum lastChunkChecksum = null;
|
||||||
|
|
||||||
|
/**
|
||||||
BlockSender(ExtendedBlock block, long startOffset, long length,
|
* Constructor
|
||||||
boolean corruptChecksumOk, boolean chunkOffsetOK,
|
*
|
||||||
boolean verifyChecksum, DataNode datanode) throws IOException {
|
* @param block Block that is being read
|
||||||
this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
|
* @param startOffset starting offset to read from
|
||||||
verifyChecksum, datanode, null);
|
* @param length length of data to read
|
||||||
}
|
* @param corruptChecksumOk
|
||||||
|
* @param chunkOffsetOK need to send check offset in checksum header
|
||||||
|
* @param verifyChecksum verify checksum while reading the data
|
||||||
|
* @param datanode datanode from which the block is being read
|
||||||
|
* @param clientTraceFmt format string used to print client trace logs
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
BlockSender(ExtendedBlock block, long startOffset, long length,
|
BlockSender(ExtendedBlock block, long startOffset, long length,
|
||||||
boolean corruptChecksumOk, boolean chunkOffsetOK,
|
boolean corruptChecksumOk, boolean chunkOffsetOK,
|
||||||
boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
|
boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
try {
|
try {
|
||||||
this.block = block;
|
this.block = block;
|
||||||
|
this.chunkOffsetOK = chunkOffsetOK;
|
||||||
|
this.corruptChecksumOk = corruptChecksumOk;
|
||||||
|
this.verifyChecksum = verifyChecksum;
|
||||||
|
this.clientTraceFmt = clientTraceFmt;
|
||||||
|
|
||||||
synchronized(datanode.data) {
|
synchronized(datanode.data) {
|
||||||
this.replica = datanode.data.getReplica(block.getBlockPoolId(),
|
this.replica = getReplica(block, datanode);
|
||||||
block.getBlockId());
|
|
||||||
if (replica == null) {
|
|
||||||
throw new ReplicaNotFoundException(block);
|
|
||||||
}
|
|
||||||
this.replicaVisibleLength = replica.getVisibleLength();
|
this.replicaVisibleLength = replica.getVisibleLength();
|
||||||
}
|
}
|
||||||
long minEndOffset = startOffset + length;
|
// if there is a write in progress
|
||||||
// if this is a write in progress
|
|
||||||
ChunkChecksum chunkChecksum = null;
|
ChunkChecksum chunkChecksum = null;
|
||||||
if (replica instanceof ReplicaBeingWritten) {
|
if (replica instanceof ReplicaBeingWritten) {
|
||||||
for (int i = 0; i < 30 && replica.getBytesOnDisk() < minEndOffset; i++) {
|
long minEndOffset = startOffset + length;
|
||||||
try {
|
waitForMinLength((ReplicaBeingWritten)replica, minEndOffset);
|
||||||
Thread.sleep(100);
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
throw new IOException(ie);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
long currentBytesOnDisk = replica.getBytesOnDisk();
|
|
||||||
|
|
||||||
if (currentBytesOnDisk < minEndOffset) {
|
|
||||||
throw new IOException(String.format(
|
|
||||||
"need %d bytes, but only %d bytes available",
|
|
||||||
minEndOffset,
|
|
||||||
currentBytesOnDisk
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
ReplicaInPipeline rip = (ReplicaInPipeline) replica;
|
ReplicaInPipeline rip = (ReplicaInPipeline) replica;
|
||||||
chunkChecksum = rip.getLastChecksumAndDataLen();
|
chunkChecksum = rip.getLastChecksumAndDataLen();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
|
||||||
throw new IOException(
|
throw new IOException("Replica gen stamp < block genstamp, block="
|
||||||
"replica.getGenerationStamp() < block.getGenerationStamp(), block="
|
|
||||||
+ block + ", replica=" + replica);
|
+ block + ", replica=" + replica);
|
||||||
}
|
}
|
||||||
if (replicaVisibleLength < 0) {
|
if (replicaVisibleLength < 0) {
|
||||||
throw new IOException("The replica is not readable, block="
|
throw new IOException("Replica is not readable, block="
|
||||||
+ block + ", replica=" + replica);
|
+ block + ", replica=" + replica);
|
||||||
}
|
}
|
||||||
if (DataNode.LOG.isDebugEnabled()) {
|
if (DataNode.LOG.isDebugEnabled()) {
|
||||||
DataNode.LOG.debug("block=" + block + ", replica=" + replica);
|
DataNode.LOG.debug("block=" + block + ", replica=" + replica);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.chunkOffsetOK = chunkOffsetOK;
|
|
||||||
this.corruptChecksumOk = corruptChecksumOk;
|
|
||||||
this.verifyChecksum = verifyChecksum;
|
|
||||||
|
|
||||||
// transferToFully() fails on 32 bit platforms for block sizes >= 2GB,
|
// transferToFully() fails on 32 bit platforms for block sizes >= 2GB,
|
||||||
// use normal transfer in those cases
|
// use normal transfer in those cases
|
||||||
this.transferToAllowed = datanode.transferToAllowed &&
|
this.transferToAllowed = datanode.transferToAllowed &&
|
||||||
(!is32Bit || length < (long) Integer.MAX_VALUE);
|
(!is32Bit || length <= Integer.MAX_VALUE);
|
||||||
this.clientTraceFmt = clientTraceFmt;
|
|
||||||
|
|
||||||
if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
|
DataChecksum csum;
|
||||||
|
if (!corruptChecksumOk || datanode.data.metaFileExists(block)) {
|
||||||
checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
|
checksumIn = new DataInputStream(new BufferedInputStream(datanode.data
|
||||||
.getMetaDataInputStream(block), HdfsConstants.IO_FILE_BUFFER_SIZE));
|
.getMetaDataInputStream(block), HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||||
|
|
||||||
// read and handle the common header here. For now just a version
|
// read and handle the common header here. For now just a version
|
||||||
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
|
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
|
||||||
short version = header.getVersion();
|
short version = header.getVersion();
|
||||||
|
|
||||||
if (version != FSDataset.METADATA_VERSION) {
|
if (version != FSDataset.METADATA_VERSION) {
|
||||||
LOG.warn("Wrong version (" + version + ") for metadata file for "
|
LOG.warn("Wrong version (" + version + ") for metadata file for "
|
||||||
+ block + " ignoring ...");
|
+ block + " ignoring ...");
|
||||||
}
|
}
|
||||||
checksum = header.getChecksum();
|
csum = header.getChecksum();
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Could not find metadata file for " + block);
|
LOG.warn("Could not find metadata file for " + block);
|
||||||
// This only decides the buffer size. Use BUFFER_SIZE?
|
// This only decides the buffer size. Use BUFFER_SIZE?
|
||||||
checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
|
csum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
|
||||||
16 * 1024);
|
16 * 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If bytesPerChecksum is very large, then the metadata file
|
/*
|
||||||
* is mostly corrupted. For now just truncate bytesPerchecksum to
|
* If chunkSize is very large, then the metadata file is mostly
|
||||||
* blockLength.
|
* corrupted. For now just truncate bytesPerchecksum to blockLength.
|
||||||
*/
|
*/
|
||||||
bytesPerChecksum = checksum.getBytesPerChecksum();
|
int size = csum.getBytesPerChecksum();
|
||||||
if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > replicaVisibleLength) {
|
if (size > 10*1024*1024 && size > replicaVisibleLength) {
|
||||||
checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
|
csum = DataChecksum.newDataChecksum(csum.getChecksumType(),
|
||||||
Math.max((int)replicaVisibleLength, 10*1024*1024));
|
Math.max((int)replicaVisibleLength, 10*1024*1024));
|
||||||
bytesPerChecksum = checksum.getBytesPerChecksum();
|
size = csum.getBytesPerChecksum();
|
||||||
}
|
}
|
||||||
|
chunkSize = size;
|
||||||
|
checksum = csum;
|
||||||
checksumSize = checksum.getChecksumSize();
|
checksumSize = checksum.getChecksumSize();
|
||||||
|
length = length < 0 ? replicaVisibleLength : length;
|
||||||
if (length < 0) {
|
|
||||||
length = replicaVisibleLength;
|
|
||||||
}
|
|
||||||
|
|
||||||
// end is either last byte on disk or the length for which we have a
|
// end is either last byte on disk or the length for which we have a
|
||||||
// checksum
|
// checksum
|
||||||
if (chunkChecksum != null) {
|
long end = chunkChecksum != null ? chunkChecksum.getDataLength()
|
||||||
endOffset = chunkChecksum.getDataLength();
|
: replica.getBytesOnDisk();
|
||||||
} else {
|
if (startOffset < 0 || startOffset > end
|
||||||
endOffset = replica.getBytesOnDisk();
|
|| (length + startOffset) > end) {
|
||||||
}
|
|
||||||
|
|
||||||
if (startOffset < 0 || startOffset > endOffset
|
|
||||||
|| (length + startOffset) > endOffset) {
|
|
||||||
String msg = " Offset " + startOffset + " and length " + length
|
String msg = " Offset " + startOffset + " and length " + length
|
||||||
+ " don't match block " + block + " ( blockLen " + endOffset + " )";
|
+ " don't match block " + block + " ( blockLen " + end + " )";
|
||||||
LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +
|
LOG.warn(datanode.getDNRegistrationForBP(block.getBlockPoolId()) +
|
||||||
":sendBlock() : " + msg);
|
":sendBlock() : " + msg);
|
||||||
throw new IOException(msg);
|
throw new IOException(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
offset = (startOffset - (startOffset % bytesPerChecksum));
|
// Ensure read offset is position at the beginning of chunk
|
||||||
|
offset = startOffset - (startOffset % chunkSize);
|
||||||
if (length >= 0) {
|
if (length >= 0) {
|
||||||
// Make sure endOffset points to end of a checksumed chunk.
|
// Ensure endOffset points to end of chunk.
|
||||||
long tmpLen = startOffset + length;
|
long tmpLen = startOffset + length;
|
||||||
if (tmpLen % bytesPerChecksum != 0) {
|
if (tmpLen % chunkSize != 0) {
|
||||||
tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
|
tmpLen += (chunkSize - tmpLen % chunkSize);
|
||||||
}
|
}
|
||||||
if (tmpLen < endOffset) {
|
if (tmpLen < end) {
|
||||||
// will use on-disk checksum here since the end is a stable chunk
|
// will use on-disk checksum here since the end is a stable chunk
|
||||||
endOffset = tmpLen;
|
end = tmpLen;
|
||||||
} else if (chunkChecksum != null) {
|
} else if (chunkChecksum != null) {
|
||||||
//in last chunk which is changing. flag that we need to use in-memory
|
// last chunk is changing. flag that we need to use in-memory checksum
|
||||||
// checksum
|
|
||||||
this.lastChunkChecksum = chunkChecksum;
|
this.lastChunkChecksum = chunkChecksum;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
endOffset = end;
|
||||||
|
|
||||||
// seek to the right offsets
|
// seek to the right offsets
|
||||||
if (offset > 0) {
|
if (offset > 0) {
|
||||||
long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
|
long checksumSkip = (offset / chunkSize) * checksumSize;
|
||||||
// note blockInStream is seeked when created below
|
// note blockInStream is seeked when created below
|
||||||
if (checksumSkip > 0) {
|
if (checksumSkip > 0) {
|
||||||
// Should we use seek() for checksum file as well?
|
// Should we use seek() for checksum file as well?
|
||||||
|
@ -237,7 +276,6 @@ class BlockSender implements java.io.Closeable {
|
||||||
if (DataNode.LOG.isDebugEnabled()) {
|
if (DataNode.LOG.isDebugEnabled()) {
|
||||||
DataNode.LOG.debug("replica=" + replica);
|
DataNode.LOG.debug("replica=" + replica);
|
||||||
}
|
}
|
||||||
|
|
||||||
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
|
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
IOUtils.closeStream(this);
|
IOUtils.closeStream(this);
|
||||||
|
@ -251,19 +289,17 @@ class BlockSender implements java.io.Closeable {
|
||||||
*/
|
*/
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
IOException ioe = null;
|
IOException ioe = null;
|
||||||
// close checksum file
|
|
||||||
if(checksumIn!=null) {
|
if(checksumIn!=null) {
|
||||||
try {
|
try {
|
||||||
checksumIn.close();
|
checksumIn.close(); // close checksum file
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
ioe = e;
|
ioe = e;
|
||||||
}
|
}
|
||||||
checksumIn = null;
|
checksumIn = null;
|
||||||
}
|
}
|
||||||
// close data file
|
|
||||||
if(blockIn!=null) {
|
if(blockIn!=null) {
|
||||||
try {
|
try {
|
||||||
blockIn.close();
|
blockIn.close(); // close data file
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
ioe = e;
|
ioe = e;
|
||||||
}
|
}
|
||||||
|
@ -275,6 +311,40 @@ class BlockSender implements java.io.Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Replica getReplica(ExtendedBlock block, DataNode datanode)
|
||||||
|
throws ReplicaNotFoundException {
|
||||||
|
Replica replica = datanode.data.getReplica(block.getBlockPoolId(),
|
||||||
|
block.getBlockId());
|
||||||
|
if (replica == null) {
|
||||||
|
throw new ReplicaNotFoundException(block);
|
||||||
|
}
|
||||||
|
return replica;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for rbw replica to reach the length
|
||||||
|
* @param rbw replica that is being written to
|
||||||
|
* @param len minimum length to reach
|
||||||
|
* @throws IOException on failing to reach the len in given wait time
|
||||||
|
*/
|
||||||
|
private static void waitForMinLength(ReplicaBeingWritten rbw, long len)
|
||||||
|
throws IOException {
|
||||||
|
// Wait for 3 seconds for rbw replica to reach the minimum length
|
||||||
|
for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
throw new IOException(ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
long bytesOnDisk = rbw.getBytesOnDisk();
|
||||||
|
if (bytesOnDisk < len) {
|
||||||
|
throw new IOException(
|
||||||
|
String.format("Need %d bytes, but only %d bytes available", len,
|
||||||
|
bytesOnDisk));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts an IOExcpetion (not subclasses) to SocketException.
|
* Converts an IOExcpetion (not subclasses) to SocketException.
|
||||||
* This is typically done to indicate to upper layers that the error
|
* This is typically done to indicate to upper layers that the error
|
||||||
|
@ -296,54 +366,43 @@ class BlockSender implements java.io.Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends upto maxChunks chunks of data.
|
* @param datalen Length of data
|
||||||
*
|
* @return number of chunks for data of given size
|
||||||
* When blockInPosition is >= 0, assumes 'out' is a
|
|
||||||
* {@link SocketOutputStream} and tries
|
|
||||||
* {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
|
|
||||||
* send data (and updates blockInPosition).
|
|
||||||
*/
|
*/
|
||||||
private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
|
private int numberOfChunks(long datalen) {
|
||||||
throws IOException {
|
return (int) ((datalen + chunkSize - 1)/chunkSize);
|
||||||
// Sends multiple chunks in one packet with a single write().
|
}
|
||||||
|
|
||||||
int len = (int) Math.min(endOffset - offset,
|
/**
|
||||||
(((long) bytesPerChecksum) * ((long) maxChunks)));
|
* Sends a packet with up to maxChunks chunks of data.
|
||||||
int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
|
*
|
||||||
int packetLen = len + numChunks*checksumSize + 4;
|
* @param pkt buffer used for writing packet data
|
||||||
boolean lastDataPacket = offset + len == endOffset && len > 0;
|
* @param maxChunks maximum number of chunks to send
|
||||||
pkt.clear();
|
* @param out stream to send data to
|
||||||
|
* @param transferTo use transferTo to send data
|
||||||
|
* @param throttler used for throttling data transfer bandwidth
|
||||||
|
*/
|
||||||
|
private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
|
||||||
|
boolean transferTo, DataTransferThrottler throttler) throws IOException {
|
||||||
|
int dataLen = (int) Math.min(endOffset - offset,
|
||||||
|
(chunkSize * (long) maxChunks));
|
||||||
|
|
||||||
|
int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
|
||||||
|
int checksumDataLen = numChunks * checksumSize;
|
||||||
|
int packetLen = dataLen + checksumDataLen + 4;
|
||||||
|
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
|
||||||
|
|
||||||
PacketHeader header = new PacketHeader(
|
writePacketHeader(pkt, dataLen, packetLen);
|
||||||
packetLen, offset, seqno, (len == 0), len);
|
|
||||||
header.putInBuffer(pkt);
|
|
||||||
|
|
||||||
int checksumOff = pkt.position();
|
int checksumOff = pkt.position();
|
||||||
int checksumLen = numChunks * checksumSize;
|
|
||||||
byte[] buf = pkt.array();
|
byte[] buf = pkt.array();
|
||||||
|
|
||||||
if (checksumSize > 0 && checksumIn != null) {
|
if (checksumSize > 0 && checksumIn != null) {
|
||||||
try {
|
readChecksum(buf, checksumOff, checksumDataLen);
|
||||||
checksumIn.readFully(buf, checksumOff, checksumLen);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn(" Could not read or failed to veirfy checksum for data"
|
|
||||||
+ " at offset " + offset + " for block " + block, e);
|
|
||||||
IOUtils.closeStream(checksumIn);
|
|
||||||
checksumIn = null;
|
|
||||||
if (corruptChecksumOk) {
|
|
||||||
if (checksumOff < checksumLen) {
|
|
||||||
// Just fill the array with zeros.
|
|
||||||
Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// write in progress that we need to use to get last checksum
|
// write in progress that we need to use to get last checksum
|
||||||
if (lastDataPacket && lastChunkChecksum != null) {
|
if (lastDataPacket && lastChunkChecksum != null) {
|
||||||
int start = checksumOff + checksumLen - checksumSize;
|
int start = checksumOff + checksumDataLen - checksumSize;
|
||||||
byte[] updatedChecksum = lastChunkChecksum.getChecksum();
|
byte[] updatedChecksum = lastChunkChecksum.getChecksum();
|
||||||
|
|
||||||
if (updatedChecksum != null) {
|
if (updatedChecksum != null) {
|
||||||
|
@ -352,52 +411,28 @@ class BlockSender implements java.io.Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int dataOff = checksumOff + checksumLen;
|
int dataOff = checksumOff + checksumDataLen;
|
||||||
|
if (!transferTo) { // normal transfer
|
||||||
if (blockInPosition < 0) {
|
IOUtils.readFully(blockIn, buf, dataOff, dataLen);
|
||||||
//normal transfer
|
|
||||||
IOUtils.readFully(blockIn, buf, dataOff, len);
|
|
||||||
|
|
||||||
if (verifyChecksum) {
|
if (verifyChecksum) {
|
||||||
int dOff = dataOff;
|
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
|
||||||
int cOff = checksumOff;
|
|
||||||
int dLeft = len;
|
|
||||||
|
|
||||||
for (int i=0; i<numChunks; i++) {
|
|
||||||
checksum.reset();
|
|
||||||
int dLen = Math.min(dLeft, bytesPerChecksum);
|
|
||||||
checksum.update(buf, dOff, dLen);
|
|
||||||
if (!checksum.compare(buf, cOff)) {
|
|
||||||
long failedPos = offset + len -dLeft;
|
|
||||||
throw new ChecksumException("Checksum failed at " +
|
|
||||||
failedPos, failedPos);
|
|
||||||
}
|
}
|
||||||
dLeft -= dLen;
|
|
||||||
dOff += dLen;
|
|
||||||
cOff += checksumSize;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//writing is done below (mainly to handle IOException)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (blockInPosition >= 0) {
|
if (transferTo) {
|
||||||
//use transferTo(). Checks on out and blockIn are already done.
|
|
||||||
|
|
||||||
SocketOutputStream sockOut = (SocketOutputStream)out;
|
SocketOutputStream sockOut = (SocketOutputStream)out;
|
||||||
//first write the packet
|
sockOut.write(buf, 0, dataOff); // First write checksum
|
||||||
sockOut.write(buf, 0, dataOff);
|
|
||||||
// no need to flush. since we know out is not a buffered stream.
|
// no need to flush. since we know out is not a buffered stream.
|
||||||
|
|
||||||
sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
|
sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
|
||||||
blockInPosition, len);
|
blockInPosition, dataLen);
|
||||||
|
blockInPosition += dataLen;
|
||||||
blockInPosition += len;
|
|
||||||
} else {
|
} else {
|
||||||
// normal transfer
|
// normal transfer
|
||||||
out.write(buf, 0, dataOff + len);
|
out.write(buf, 0, dataOff + dataLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
/* Exception while writing to the client. Connection closure from
|
/* Exception while writing to the client. Connection closure from
|
||||||
* the other end is mostly the case and we do not care much about
|
* the other end is mostly the case and we do not care much about
|
||||||
|
@ -419,7 +454,70 @@ class BlockSender implements java.io.Closeable {
|
||||||
throttler.throttle(packetLen);
|
throttler.throttle(packetLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
return len;
|
return dataLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read checksum into given buffer
|
||||||
|
* @param buf buffer to read the checksum into
|
||||||
|
* @param checksumOffset offset at which to write the checksum into buf
|
||||||
|
* @param checksumLen length of checksum to write
|
||||||
|
* @throws IOException on error
|
||||||
|
*/
|
||||||
|
private void readChecksum(byte[] buf, final int checksumOffset,
|
||||||
|
final int checksumLen) throws IOException {
|
||||||
|
if (checksumSize <= 0 && checksumIn == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
checksumIn.readFully(buf, checksumOffset, checksumLen);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn(" Could not read or failed to veirfy checksum for data"
|
||||||
|
+ " at offset " + offset + " for block " + block, e);
|
||||||
|
IOUtils.closeStream(checksumIn);
|
||||||
|
checksumIn = null;
|
||||||
|
if (corruptChecksumOk) {
|
||||||
|
if (checksumOffset < checksumLen) {
|
||||||
|
// Just fill the array with zeros.
|
||||||
|
Arrays.fill(buf, checksumOffset, checksumLen, (byte) 0);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute checksum for chunks and verify the checksum that is read from
|
||||||
|
* the metadata file is correct.
|
||||||
|
*
|
||||||
|
* @param buf buffer that has checksum and data
|
||||||
|
* @param dataOffset position where data is written in the buf
|
||||||
|
* @param datalen length of data
|
||||||
|
* @param numChunks number of chunks corresponding to data
|
||||||
|
* @param checksumOffset offset where checksum is written in the buf
|
||||||
|
* @throws ChecksumException on failed checksum verification
|
||||||
|
*/
|
||||||
|
public void verifyChecksum(final byte[] buf, final int dataOffset,
|
||||||
|
final int datalen, final int numChunks, final int checksumOffset)
|
||||||
|
throws ChecksumException {
|
||||||
|
int dOff = dataOffset;
|
||||||
|
int cOff = checksumOffset;
|
||||||
|
int dLeft = datalen;
|
||||||
|
|
||||||
|
for (int i = 0; i < numChunks; i++) {
|
||||||
|
checksum.reset();
|
||||||
|
int dLen = Math.min(dLeft, chunkSize);
|
||||||
|
checksum.update(buf, dOff, dLen);
|
||||||
|
if (!checksum.compare(buf, cOff)) {
|
||||||
|
long failedPos = offset + datalen - dLeft;
|
||||||
|
throw new ChecksumException("Checksum failed at " + failedPos,
|
||||||
|
failedPos);
|
||||||
|
}
|
||||||
|
dLeft -= dLen;
|
||||||
|
dOff += dLen;
|
||||||
|
cOff += checksumSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -433,70 +531,54 @@ class BlockSender implements java.io.Closeable {
|
||||||
* {@link SocketOutputStream#transferToFully(FileChannel,
|
* {@link SocketOutputStream#transferToFully(FileChannel,
|
||||||
* long, int)}.
|
* long, int)}.
|
||||||
* @param throttler for sending data.
|
* @param throttler for sending data.
|
||||||
* @return total bytes reads, including crc.
|
* @return total bytes read, including checksum data.
|
||||||
*/
|
*/
|
||||||
long sendBlock(DataOutputStream out, OutputStream baseStream,
|
long sendBlock(DataOutputStream out, OutputStream baseStream,
|
||||||
DataTransferThrottler throttler) throws IOException {
|
DataTransferThrottler throttler) throws IOException {
|
||||||
if( out == null ) {
|
if (out == null) {
|
||||||
throw new IOException( "out stream is null" );
|
throw new IOException( "out stream is null" );
|
||||||
}
|
}
|
||||||
this.throttler = throttler;
|
final long initialOffset = offset;
|
||||||
|
|
||||||
long initialOffset = offset;
|
|
||||||
long totalRead = 0;
|
long totalRead = 0;
|
||||||
OutputStream streamForSendChunks = out;
|
OutputStream streamForSendChunks = out;
|
||||||
|
|
||||||
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
||||||
try {
|
try {
|
||||||
try {
|
writeChecksumHeader(out);
|
||||||
checksum.writeHeader(out);
|
|
||||||
if ( chunkOffsetOK ) {
|
|
||||||
out.writeLong( offset );
|
|
||||||
}
|
|
||||||
out.flush();
|
|
||||||
} catch (IOException e) { //socket error
|
|
||||||
throw ioeToSocketException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
int maxChunksPerPacket;
|
int maxChunksPerPacket;
|
||||||
int pktSize = PacketHeader.PKT_HEADER_LEN;
|
int pktSize = PacketHeader.PKT_HEADER_LEN;
|
||||||
|
boolean transferTo = transferToAllowed && !verifyChecksum
|
||||||
if (transferToAllowed && !verifyChecksum &&
|
&& baseStream instanceof SocketOutputStream
|
||||||
baseStream instanceof SocketOutputStream &&
|
&& blockIn instanceof FileInputStream;
|
||||||
blockIn instanceof FileInputStream) {
|
if (transferTo) {
|
||||||
|
|
||||||
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
|
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
|
||||||
|
|
||||||
// blockInPosition also indicates sendChunks() uses transferTo.
|
|
||||||
blockInPosition = fileChannel.position();
|
blockInPosition = fileChannel.position();
|
||||||
streamForSendChunks = baseStream;
|
streamForSendChunks = baseStream;
|
||||||
|
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
|
||||||
|
|
||||||
// assure a mininum buffer size.
|
// Smaller packet size to only hold checksum when doing transferTo
|
||||||
maxChunksPerPacket = (Math.max(HdfsConstants.IO_FILE_BUFFER_SIZE,
|
|
||||||
MIN_BUFFER_WITH_TRANSFERTO)
|
|
||||||
+ bytesPerChecksum - 1)/bytesPerChecksum;
|
|
||||||
|
|
||||||
// allocate smaller buffer while using transferTo().
|
|
||||||
pktSize += checksumSize * maxChunksPerPacket;
|
pktSize += checksumSize * maxChunksPerPacket;
|
||||||
} else {
|
} else {
|
||||||
maxChunksPerPacket = Math.max(1, (HdfsConstants.IO_FILE_BUFFER_SIZE
|
maxChunksPerPacket = Math.max(1,
|
||||||
+ bytesPerChecksum - 1) / bytesPerChecksum);
|
numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||||
pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
|
// Packet size includes both checksum and data
|
||||||
|
pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
|
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
|
||||||
|
|
||||||
while (endOffset > offset) {
|
while (endOffset > offset) {
|
||||||
long len = sendChunks(pktBuf, maxChunksPerPacket,
|
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
|
||||||
streamForSendChunks);
|
transferTo, throttler);
|
||||||
offset += len;
|
offset += len;
|
||||||
totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
|
totalRead += len + (numberOfChunks(len) * checksumSize);
|
||||||
checksumSize);
|
|
||||||
seqno++;
|
seqno++;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
// send an empty packet to mark the end of the block
|
// send an empty packet to mark the end of the block
|
||||||
sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
|
sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
|
||||||
|
throttler);
|
||||||
out.flush();
|
out.flush();
|
||||||
} catch (IOException e) { //socket error
|
} catch (IOException e) { //socket error
|
||||||
throw ioeToSocketException(e);
|
throw ioeToSocketException(e);
|
||||||
|
@ -506,14 +588,39 @@ class BlockSender implements java.io.Closeable {
|
||||||
} finally {
|
} finally {
|
||||||
if (clientTraceFmt != null) {
|
if (clientTraceFmt != null) {
|
||||||
final long endTime = System.nanoTime();
|
final long endTime = System.nanoTime();
|
||||||
ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
|
ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
|
||||||
|
initialOffset, endTime - startTime));
|
||||||
}
|
}
|
||||||
close();
|
close();
|
||||||
}
|
}
|
||||||
|
|
||||||
return totalRead;
|
return totalRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write checksum header to the output stream
|
||||||
|
*/
|
||||||
|
private void writeChecksumHeader(DataOutputStream out) throws IOException {
|
||||||
|
try {
|
||||||
|
checksum.writeHeader(out);
|
||||||
|
if (chunkOffsetOK) {
|
||||||
|
out.writeLong(offset);
|
||||||
|
}
|
||||||
|
out.flush();
|
||||||
|
} catch (IOException e) { //socket error
|
||||||
|
throw ioeToSocketException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write packet header into {@code pkt}
|
||||||
|
*/
|
||||||
|
private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
|
||||||
|
pkt.clear();
|
||||||
|
PacketHeader header = new PacketHeader(packetLen, offset, seqno,
|
||||||
|
(dataLen == 0), dataLen);
|
||||||
|
header.putInBuffer(pkt);
|
||||||
|
}
|
||||||
|
|
||||||
boolean didSendEntireByteRange() {
|
boolean didSendEntireByteRange() {
|
||||||
return sentEntireByteRange;
|
return sentEntireByteRange;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2026,7 +2026,7 @@ public class DataNode extends Configured
|
||||||
out = new DataOutputStream(new BufferedOutputStream(baseStream,
|
out = new DataOutputStream(new BufferedOutputStream(baseStream,
|
||||||
HdfsConstants.SMALL_BUFFER_SIZE));
|
HdfsConstants.SMALL_BUFFER_SIZE));
|
||||||
blockSender = new BlockSender(b, 0, b.getNumBytes(),
|
blockSender = new BlockSender(b, 0, b.getNumBytes(),
|
||||||
false, false, false, DataNode.this);
|
false, false, false, DataNode.this, null);
|
||||||
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
|
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|
|
@ -597,7 +597,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
try {
|
try {
|
||||||
// check if the block exists or not
|
// check if the block exists or not
|
||||||
blockSender = new BlockSender(block, 0, -1, false, false, false,
|
blockSender = new BlockSender(block, 0, -1, false, false, false,
|
||||||
datanode);
|
datanode, null);
|
||||||
|
|
||||||
// set up response stream
|
// set up response stream
|
||||||
OutputStream baseStream = NetUtils.getOutputStream(
|
OutputStream baseStream = NetUtils.getOutputStream(
|
||||||
|
|
Loading…
Reference in New Issue