HDFS-3721. hsync support broke wire compatibility. Contributed by Todd Lipcon and Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1371496 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-08-09 21:33:20 +00:00
parent 2b97a4cce4
commit 49e89472a9
7 changed files with 557 additions and 447 deletions

View File

@ -422,6 +422,8 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3710. libhdfs misuses O_RDONLY/WRONLY/RDWR. (Andy Isaacson via atm)
HDFS-3721. hsync support broke wire compatibility. (todd and atm)
BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -30,7 +30,6 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@ -126,7 +125,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
private long lastQueuedSeqno = -1;
private long lastAckedSeqno = -1;
private long bytesCurBlock = 0; // bytes writen in current block
private int packetSize = 0; // write packet size, including the header.
private int packetSize = 0; // write packet size, not including the header.
private int chunksPerPacket = 0;
private volatile IOException lastException = null;
private long artificialSlowdown = 0;
@ -147,28 +146,31 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet
/** buffer for accumulating packet checksum and data */
ByteBuffer buffer; // wraps buf, only one of these two may be non-null
byte[] buf;
/**
* buf is pointed into like follows:
* (C is checksum data, D is payload data)
*
* [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___]
* ^ ^ ^ ^
* | checksumPos dataStart dataPos
* checksumStart
* [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
* ^ ^ ^ ^
* | checksumPos dataStart dataPos
* checksumStart
*
* Right before sending, we move the checksum data to immediately precede
* the actual data, and then insert the header into the buffer immediately
* preceding the checksum data, so we make sure to keep enough space in
* front of the checksum data to support the largest conceivable header.
*/
int checksumStart;
int checksumPos;
int dataStart;
int dataPos;
int checksumPos;
private static final long HEART_BEAT_SEQNO = -1L;
/**
* create a heartbeat packet
* Create a heartbeat packet.
*/
Packet() {
this.lastPacketInBlock = false;
@ -176,17 +178,19 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
this.offsetInBlock = 0;
this.seqno = HEART_BEAT_SEQNO;
buffer = null;
int packetSize = PacketHeader.PKT_HEADER_LEN + HdfsConstants.BYTES_IN_INTEGER;
buf = new byte[packetSize];
buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
checksumStart = dataStart = packetSize;
checksumPos = checksumStart;
dataPos = dataStart;
checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
maxChunks = 0;
}
// create a new packet
/**
* Create a new packet.
*
* @param pktSize maximum size of the packet, including checksum data and actual data.
* @param chunksPerPkt maximum number of chunks per packet.
* @param offsetInBlock offset in bytes into the HDFS block.
*/
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false;
this.numChunks = 0;
@ -194,25 +198,24 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
this.seqno = currentSeqno;
currentSeqno++;
buffer = null;
buf = new byte[pktSize];
buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
checksumStart = PacketHeader.PKT_HEADER_LEN;
checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
checksumPos = checksumStart;
dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
dataPos = dataStart;
maxChunks = chunksPerPkt;
}
void writeData(byte[] inarray, int off, int len) {
if ( dataPos + len > buf.length) {
if (dataPos + len > buf.length) {
throw new BufferOverflowException();
}
System.arraycopy(inarray, off, buf, dataPos, len);
dataPos += len;
}
void writeChecksum(byte[] inarray, int off, int len) {
void writeChecksum(byte[] inarray, int off, int len) {
if (checksumPos + len > dataStart) {
throw new BufferOverflowException();
}
@ -221,45 +224,38 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
}
/**
* Returns ByteBuffer that contains one full packet, including header.
* Write the full packet, including the header, to the given output stream.
*/
ByteBuffer getBuffer() {
/* Once this is called, no more data can be added to the packet.
* setting 'buf' to null ensures that.
* This is called only when the packet is ready to be sent.
*/
if (buffer != null) {
return buffer;
}
//prepare the header and close any gap between checksum and data.
int dataLen = dataPos - dataStart;
int checksumLen = checksumPos - checksumStart;
if (checksumPos != dataStart) {
/* move the checksum to cover the gap.
* This can happen for the last packet.
*/
System.arraycopy(buf, checksumStart, buf,
dataStart - checksumLen , checksumLen);
}
int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
//normally dataStart == checksumPos, i.e., offset is zero.
buffer = ByteBuffer.wrap(
buf, dataStart - checksumPos,
PacketHeader.PKT_HEADER_LEN + pktLen - HdfsConstants.BYTES_IN_INTEGER);
buf = null;
buffer.mark();
void writeTo(DataOutputStream stm) throws IOException {
final int dataLen = dataPos - dataStart;
final int checksumLen = checksumPos - checksumStart;
final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
PacketHeader header = new PacketHeader(
pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
header.putInBuffer(buffer);
buffer.reset();
return buffer;
if (checksumPos != dataStart) {
// Move the checksum to cover the gap. This can happen for the last
// packet or during an hflush/hsync call.
System.arraycopy(buf, checksumStart, buf,
dataStart - checksumLen , checksumLen);
checksumPos = dataStart;
checksumStart = checksumPos - checksumLen;
}
final int headerStart = checksumStart - header.getSerializedSize();
assert checksumStart + 1 >= header.getSerializedSize();
assert checksumPos == dataStart;
assert headerStart >= 0;
assert headerStart + header.getSerializedSize() == checksumStart;
// Copy the header data into the buffer immediately preceding the checksum
// data.
System.arraycopy(header.getBytes(), 0, buf, headerStart,
header.getSerializedSize());
// Write the now contiguous full packet to the output stream.
stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
}
// get the packet's last byte's offset in the block
@ -502,8 +498,6 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
}
// send the packet
ByteBuffer buf = one.getBuffer();
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
@ -519,8 +513,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
}
// write out data to remote datanode
try {
blockStream.write(buf.array(), buf.position(), buf.remaining());
try {
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
@ -1358,9 +1352,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize();
int n = PacketHeader.PKT_HEADER_LEN;
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
packetSize = n + chunkSize*chunksPerPacket;
chunksPerPacket = Math.max(psize/chunkSize, 1);
packetSize = chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
", chunkSize=" + chunkSize +
@ -1474,8 +1467,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket = new Packet(0, 0, bytesCurBlock);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
@ -1751,8 +1743,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket = new Packet(0, 0, bytesCurBlock);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
}
@ -1805,8 +1796,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
@VisibleForTesting
public synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
packetSize = PacketHeader.PKT_HEADER_LEN +
(checksum.getBytesPerChecksum() +
packetSize = (checksum.getBytesPerChecksum() +
checksum.getChecksumSize()) * chunksPerPacket;
}

View File

@ -33,12 +33,12 @@ import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
@ -48,14 +48,11 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
/**
* This is a wrapper around connection to datanode
* and understands checksum, offset etc.
@ -93,11 +90,9 @@ public class RemoteBlockReader2 implements BlockReader {
private final ReadableByteChannel in;
private DataChecksum checksum;
private PacketHeader curHeader;
private ByteBuffer curPacketBuf = null;
private PacketReceiver packetReceiver = new PacketReceiver(true);
private ByteBuffer curDataSlice = null;
/** offset in block of the last chunk received */
private long lastSeqNo = -1;
@ -105,10 +100,6 @@ public class RemoteBlockReader2 implements BlockReader {
private long startOffset;
private final String filename;
private static DirectBufferPool bufferPool = new DirectBufferPool();
private final ByteBuffer headerBuf = ByteBuffer.allocate(
PacketHeader.PKT_HEADER_LEN);
private final int bytesPerChecksum;
private final int checksumSize;
@ -132,7 +123,7 @@ public class RemoteBlockReader2 implements BlockReader {
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket();
}
if (curDataSlice.remaining() == 0) {
@ -149,7 +140,7 @@ public class RemoteBlockReader2 implements BlockReader {
@Override
public int read(ByteBuffer buf) throws IOException {
if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket();
}
if (curDataSlice.remaining() == 0) {
@ -167,11 +158,13 @@ public class RemoteBlockReader2 implements BlockReader {
}
private void readNextPacket() throws IOException {
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
//Read packet headers.
readPacketHeader();
packetReceiver.receiveNextPacket(in);
PacketHeader curHeader = packetReceiver.getHeader();
curDataSlice = packetReceiver.getDataSlice();
assert curDataSlice.capacity() == curHeader.getDataLen();
if (LOG.isTraceEnabled()) {
LOG.trace("DFSClient readNextPacket got header " + curHeader);
}
@ -185,17 +178,20 @@ public class RemoteBlockReader2 implements BlockReader {
if (curHeader.getDataLen() > 0) {
int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
int checksumsLen = chunks * checksumSize;
int bufsize = checksumsLen + curHeader.getDataLen();
assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
"checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() +
" checksumsLen=" + checksumsLen;
resetPacketBuffer(checksumsLen, curHeader.getDataLen());
lastSeqNo = curHeader.getSeqno();
if (bufsize > 0) {
readChannelFully(in, curPacketBuf);
curPacketBuf.flip();
if (verifyChecksum) {
verifyPacketChecksums();
}
if (verifyChecksum && curDataSlice.remaining() > 0) {
// N.B.: the checksum error offset reported here is actually
// relative to the start of the block, not the start of the file.
// This is slightly misleading, but preserves the behavior from
// the older BlockReader.
checksum.verifyChunkedSums(curDataSlice,
packetReceiver.getChecksumSlice(),
filename, curHeader.getOffsetInBlock());
}
bytesNeededToFinish -= curHeader.getDataLen();
}
@ -218,40 +214,7 @@ public class RemoteBlockReader2 implements BlockReader {
}
}
}
private void verifyPacketChecksums() throws ChecksumException {
// N.B.: the checksum error offset reported here is actually
// relative to the start of the block, not the start of the file.
// This is slightly misleading, but preserves the behavior from
// the older BlockReader.
checksum.verifyChunkedSums(curDataSlice, curPacketBuf,
filename, curHeader.getOffsetInBlock());
}
private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
throws IOException {
while (buf.remaining() > 0) {
int n = ch.read(buf);
if (n < 0) {
throw new IOException("Premature EOF reading from " + ch);
}
}
}
private void resetPacketBuffer(int checksumsLen, int dataLen) {
int packetLen = checksumsLen + dataLen;
if (curPacketBuf == null ||
curPacketBuf.capacity() < packetLen) {
returnPacketBufToPool();
curPacketBuf = bufferPool.getBuffer(packetLen);
}
curPacketBuf.position(checksumsLen);
curDataSlice = curPacketBuf.slice();
curDataSlice.limit(dataLen);
curPacketBuf.clear();
curPacketBuf.limit(checksumsLen + dataLen);
}
@Override
public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least
@ -272,23 +235,14 @@ public class RemoteBlockReader2 implements BlockReader {
return nSkipped;
}
private void readPacketHeader() throws IOException {
headerBuf.clear();
readChannelFully(in, headerBuf);
headerBuf.flip();
if (curHeader == null) curHeader = new PacketHeader();
curHeader.readFields(headerBuf);
}
private void readTrailingEmptyPacket() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Reading empty packet at end of read");
}
headerBuf.clear();
readChannelFully(in, headerBuf);
headerBuf.flip();
PacketHeader trailer = new PacketHeader();
trailer.readFields(headerBuf);
packetReceiver.receiveNextPacket(in);
PacketHeader trailer = packetReceiver.getHeader();
if (!trailer.isLastPacketInBlock() ||
trailer.getDataLen() != 0) {
throw new IOException("Expected empty end-of-read packet! Header: " +
@ -321,7 +275,7 @@ public class RemoteBlockReader2 implements BlockReader {
@Override
public synchronized void close() throws IOException {
returnPacketBufToPool();
packetReceiver.close();
startOffset = -1;
checksum = null;
@ -332,24 +286,6 @@ public class RemoteBlockReader2 implements BlockReader {
// in will be closed when its Socket is closed.
}
@Override
protected void finalize() throws Throwable {
try {
// just in case it didn't get closed, we
// may as well still try to return the buffer
returnPacketBufToPool();
} finally {
super.finalize();
}
}
private void returnPacketBufToPool() {
if (curPacketBuf != null) {
bufferPool.returnBuffer(curPacketBuf);
curPacketBuf = null;
}
}
/**
* Take the socket used to talk to the DN.
*/

View File

@ -27,14 +27,31 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Shorts;
import com.google.common.primitives.Ints;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* Header data for each packet that goes through the read/write pipelines.
* Includes all of the information about the packet, excluding checksums and
* actual data.
*
* This data includes:
* - the offset in bytes into the HDFS block of the data in this packet
* - the sequence number of this packet in the pipeline
* - whether or not this is the last packet in the pipeline
* - the length of the data in this packet
* - whether or not this packet should be synced by the DNs.
*
* When serialized, this header is written out as a protocol buffer, preceded
* by a 4-byte integer representing the full packet length, and a 2-byte short
* representing the header length.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class PacketHeader {
/** Header size for a packet */
private static final int PROTO_SIZE =
private static final int MAX_PROTO_SIZE =
PacketHeaderProto.newBuilder()
.setOffsetInBlock(0)
.setSeqno(0)
@ -42,8 +59,10 @@ public class PacketHeader {
.setDataLen(0)
.setSyncBlock(false)
.build().getSerializedSize();
public static final int PKT_HEADER_LEN =
6 + PROTO_SIZE;
public static final int PKT_LENGTHS_LEN =
Ints.BYTES + Shorts.BYTES;
public static final int PKT_MAX_HEADER_LEN =
PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
private int packetLen;
private PacketHeaderProto proto;
@ -54,13 +73,25 @@ public class PacketHeader {
public PacketHeader(int packetLen, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
this.packetLen = packetLen;
proto = PacketHeaderProto.newBuilder()
Preconditions.checkArgument(packetLen >= Ints.BYTES,
"packet len %s should always be at least 4 bytes",
packetLen);
PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock)
.setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock)
.setDataLen(dataLen)
.setSyncBlock(syncBlock)
.build();
.setDataLen(dataLen);
if (syncBlock) {
// Only set syncBlock if it is specified.
// This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
// because it changes the length of the packet header, and BlockReceiver
// in that version did not support variable-length headers.
builder.setSyncBlock(syncBlock);
}
proto = builder.build();
}
public int getDataLen() {
@ -90,10 +121,16 @@ public class PacketHeader {
@Override
public String toString() {
return "PacketHeader with packetLen=" + packetLen +
"Header data: " +
" header data: " +
proto.toString();
}
public void setFieldsFromData(
int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
this.packetLen = packetLen;
proto = PacketHeaderProto.parseFrom(headerData);
}
public void readFields(ByteBuffer buf) throws IOException {
packetLen = buf.getInt();
short protoLen = buf.getShort();
@ -110,14 +147,21 @@ public class PacketHeader {
proto = PacketHeaderProto.parseFrom(data);
}
/**
* @return the number of bytes necessary to write out this header,
* including the length-prefixing of the payload and header
*/
public int getSerializedSize() {
return PKT_LENGTHS_LEN + proto.getSerializedSize();
}
/**
* Write the header into the buffer.
* This requires that PKT_HEADER_LEN bytes are available.
*/
public void putInBuffer(final ByteBuffer buf) {
assert proto.getSerializedSize() == PROTO_SIZE
: "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
assert proto.getSerializedSize() <= MAX_PROTO_SIZE
: "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
try {
buf.putInt(packetLen);
buf.putShort((short) proto.getSerializedSize());
@ -128,12 +172,18 @@ public class PacketHeader {
}
public void write(DataOutputStream out) throws IOException {
assert proto.getSerializedSize() == PROTO_SIZE
: "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
assert proto.getSerializedSize() <= MAX_PROTO_SIZE
: "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
out.writeInt(packetLen);
out.writeShort(proto.getSerializedSize());
proto.writeTo(out);
}
public byte[] getBytes() {
ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
putInBuffer(buf);
return buf.array();
}
/**
* Perform a sanity check on the packet, returning true if it is sane.

View File

@ -0,0 +1,292 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
/**
* Class to handle reading packets one-at-a-time from the wire.
* These packets are used both for reading and writing data to/from
* DataNodes.
*/
@InterfaceAudience.Private
public class PacketReceiver implements Closeable {
/**
* The max size of any single packet. This prevents OOMEs when
* invalid data is sent.
*/
private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
static Log LOG = LogFactory.getLog(PacketReceiver.class);
private static final DirectBufferPool bufferPool = new DirectBufferPool();
private final boolean useDirectBuffers;
/**
* Internal buffer for reading the length prefixes at the start of
* the packet.
*/
private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate(
PacketHeader.PKT_LENGTHS_LEN);
/**
* The entirety of the most recently read packet, excepting the
* length prefixes.
*/
private ByteBuffer curPacketBuf = null;
/**
* A slice of {@link #curPacketBuf} which contains just the checksums.
*/
private ByteBuffer curChecksumSlice = null;
/**
* A slice of {@link #curPacketBuf} which contains just the data.
*/
private ByteBuffer curDataSlice = null;
/**
* The packet header of the most recently read packet.
*/
private PacketHeader curHeader;
public PacketReceiver(boolean useDirectBuffers) {
this.useDirectBuffers = useDirectBuffers;
}
public PacketHeader getHeader() {
return curHeader;
}
public ByteBuffer getDataSlice() {
return curDataSlice;
}
public ByteBuffer getChecksumSlice() {
return curChecksumSlice;
}
/**
* Reads all of the data for the next packet into the appropriate buffers.
*
* The data slice and checksum slice members will be set to point to the
* user data and corresponding checksums. The header will be parsed and
* set.
*/
public void receiveNextPacket(ReadableByteChannel in) throws IOException {
doRead(in, null);
}
/**
* @see #receiveNextPacket(ReadableByteChannel)
*/
public void receiveNextPacket(InputStream in) throws IOException {
doRead(null, in);
}
private void doRead(ReadableByteChannel ch, InputStream in)
throws IOException {
// Each packet looks like:
// PLEN HLEN HEADER CHECKSUMS DATA
// 32-bit 16-bit <protobuf> <variable length>
//
// PLEN: Payload length
// = length(PLEN) + length(CHECKSUMS) + length(DATA)
// This length includes its own encoded length in
// the sum for historical reasons.
//
// HLEN: Header length
// = length(HEADER)
//
// HEADER: the actual packet header fields, encoded in protobuf
// CHECKSUMS: the crcs for the data chunk. May be missing if
// checksums were not requested
// DATA the actual block data
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
lengthPrefixBuf.clear();
doReadFully(ch, in, lengthPrefixBuf);
lengthPrefixBuf.flip();
int payloadLen = lengthPrefixBuf.getInt();
if (payloadLen < Ints.BYTES) {
// The "payload length" includes its own length. Therefore it
// should never be less than 4 bytes
throw new IOException("Invalid payload length " +
payloadLen);
}
int dataPlusChecksumLen = payloadLen - Ints.BYTES;
int headerLen = lengthPrefixBuf.getShort();
if (headerLen < 0) {
throw new IOException("Invalid header length " + headerLen);
}
if (LOG.isTraceEnabled()) {
LOG.trace("readNextPacket: dataPlusChecksumLen = " + dataPlusChecksumLen +
" headerLen = " + headerLen);
}
// Sanity check the buffer size so we don't allocate too much memory
// and OOME.
int totalLen = payloadLen + headerLen;
if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) {
throw new IOException("Incorrect value for packet payload size: " +
payloadLen);
}
// Make sure we have space for the whole packet, and
// read it.
reallocPacketBuf(dataPlusChecksumLen + headerLen);
curPacketBuf.clear();
curPacketBuf.limit(dataPlusChecksumLen + headerLen);
doReadFully(ch, in, curPacketBuf);
curPacketBuf.flip();
// Extract the header from the front of the buffer.
byte[] headerBuf = new byte[headerLen];
curPacketBuf.get(headerBuf);
if (curHeader == null) {
curHeader = new PacketHeader();
}
curHeader.setFieldsFromData(dataPlusChecksumLen, headerBuf);
// Compute the sub-slices of the packet
int checksumLen = dataPlusChecksumLen - curHeader.getDataLen();
if (checksumLen < 0) {
throw new IOException("Invalid packet: data length in packet header " +
"exceeds data length received. dataPlusChecksumLen=" +
dataPlusChecksumLen + " header: " + curHeader);
}
reslicePacket(headerLen, checksumLen, curHeader.getDataLen());
}
/**
* Rewrite the last-read packet on the wire to the given output stream.
*/
public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
Preconditions.checkState(!useDirectBuffers,
"Currently only supported for non-direct buffers");
assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN;
mirrorOut.write(lengthPrefixBuf.array(),
lengthPrefixBuf.arrayOffset(),
lengthPrefixBuf.capacity());
mirrorOut.write(curPacketBuf.array(),
curPacketBuf.arrayOffset(),
curPacketBuf.remaining());
}
private static void doReadFully(ReadableByteChannel ch, InputStream in,
ByteBuffer buf) throws IOException {
if (ch != null) {
readChannelFully(ch, buf);
} else {
Preconditions.checkState(!buf.isDirect(),
"Must not use direct buffers with InputStream API");
IOUtils.readFully(in, buf.array(),
buf.arrayOffset() + buf.position(),
buf.remaining());
buf.position(buf.position() + buf.remaining());
}
}
private void reslicePacket(
int headerLen, int checksumsLen, int dataLen) {
assert dataLen >= 0 : "invalid datalen: " + dataLen;
assert curPacketBuf.position() == headerLen;
assert checksumsLen + dataLen == curPacketBuf.remaining() :
"headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
" rem=" + curPacketBuf.remaining();
curPacketBuf.position(headerLen);
curPacketBuf.limit(headerLen + checksumsLen);
curChecksumSlice = curPacketBuf.slice();
curPacketBuf.position(headerLen + checksumsLen);
curPacketBuf.limit(headerLen + checksumsLen + dataLen);
curDataSlice = curPacketBuf.slice();
curPacketBuf.position(0);
curPacketBuf.limit(headerLen + checksumsLen + dataLen);
}
private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
throws IOException {
while (buf.remaining() > 0) {
int n = ch.read(buf);
if (n < 0) {
throw new IOException("Premature EOF reading from " + ch);
}
}
}
private void reallocPacketBuf(int atLeastCapacity) {
// Realloc the buffer if this packet is longer than the previous
// one.
if (curPacketBuf == null ||
curPacketBuf.capacity() < atLeastCapacity) {
returnPacketBufToPool();
if (useDirectBuffers) {
curPacketBuf = bufferPool.getBuffer(atLeastCapacity);
} else {
curPacketBuf = ByteBuffer.allocate(atLeastCapacity);
}
}
}
private void returnPacketBufToPool() {
if (curPacketBuf != null && curPacketBuf.isDirect()) {
bufferPool.returnBuffer(curPacketBuf);
curPacketBuf = null;
}
}
@Override // Closeable
public void close() {
returnPacketBufToPool();
}
@Override
protected void finalize() throws Throwable {
try {
// just in case it didn't get closed, we
// may as well still try to return the buffer
returnPacketBufToPool();
} finally {
super.finalize();
}
}
}

View File

@ -23,7 +23,6 @@ import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
@ -34,12 +33,14 @@ import java.util.LinkedList;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
@ -77,9 +78,10 @@ class BlockReceiver implements Closeable {
private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum;
private int checksumSize;
private ByteBuffer buf; // contains one full packet.
private int bufRead; //amount of valid data in the buf
private int maxPacketReadLen;
private PacketReceiver packetReceiver =
new PacketReceiver(false);
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
@ -248,6 +250,10 @@ class BlockReceiver implements Closeable {
*/
@Override
public void close() throws IOException {
if (packetReceiver != null) {
packetReceiver.close();
}
IOException ioe = null;
if (syncOnClose && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
@ -365,33 +371,24 @@ class BlockReceiver implements Closeable {
/**
* Verify multiple CRC chunks.
*/
private void verifyChunks( byte[] dataBuf, int dataOff, int len,
byte[] checksumBuf, int checksumOff )
throws IOException {
while (len > 0) {
int chunkLen = Math.min(len, bytesPerChecksum);
clientChecksum.update(dataBuf, dataOff, chunkLen);
if (!clientChecksum.compare(checksumBuf, checksumOff)) {
if (srcDataNode != null) {
try {
LOG.info("report corrupt block " + block + " from datanode " +
srcDataNode + " to namenode");
datanode.reportRemoteBadBlock(srcDataNode, block);
} catch (IOException e) {
LOG.warn("Failed to report bad block " + block +
" from datanode " + srcDataNode + " to namenode");
}
private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf)
throws IOException {
try {
clientChecksum.verifyChunkedSums(dataBuf, checksumBuf, clientname, 0);
} catch (ChecksumException ce) {
LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
if (srcDataNode != null) {
try {
LOG.info("report corrupt block " + block + " from datanode " +
srcDataNode + " to namenode");
datanode.reportRemoteBadBlock(srcDataNode, block);
} catch (IOException e) {
LOG.warn("Failed to report bad block " + block +
" from datanode " + srcDataNode + " to namenode");
}
throw new IOException("Unexpected checksum mismatch " +
"while writing " + block + " from " + inAddr);
}
clientChecksum.reset();
dataOff += chunkLen;
checksumOff += checksumSize;
len -= chunkLen;
throw new IOException("Unexpected checksum mismatch " +
"while writing " + block + " from " + inAddr);
}
}
@ -403,163 +400,24 @@ class BlockReceiver implements Closeable {
* This does not verify the original checksums, under the assumption
* that they have already been validated.
*/
private void translateChunks( byte[] dataBuf, int dataOff, int len,
byte[] checksumBuf, int checksumOff ) {
if (len == 0) return;
int numChunks = (len - 1)/bytesPerChecksum + 1;
diskChecksum.calculateChunkedSums(
ByteBuffer.wrap(dataBuf, dataOff, len),
ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
private void translateChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf) {
diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
}
/**
* Makes sure buf.position() is zero without modifying buf.remaining().
* It moves the data if position needs to be changed.
*/
private void shiftBufData() {
if (bufRead != buf.limit()) {
throw new IllegalStateException("bufRead should be same as " +
"buf.limit()");
}
//shift the remaining data on buf to the front
if (buf.position() > 0) {
int dataLeft = buf.remaining();
if (dataLeft > 0) {
byte[] b = buf.array();
System.arraycopy(b, buf.position(), b, 0, dataLeft);
}
buf.position(0);
bufRead = dataLeft;
buf.limit(bufRead);
}
}
/**
* reads upto toRead byte to buf at buf.limit() and increments the limit.
* throws an IOException if read does not succeed.
*/
private int readToBuf(int toRead) throws IOException {
if (toRead < 0) {
toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
- buf.limit();
}
int nRead = in.read(buf.array(), buf.limit(), toRead);
if (nRead < 0) {
throw new EOFException("while trying to read " + toRead + " bytes");
}
bufRead = buf.limit() + nRead;
buf.limit(bufRead);
return nRead;
}
/**
* Reads (at least) one packet and returns the packet length.
* buf.position() points to the start of the packet and
* buf.limit() point to the end of the packet. There could
* be more data from next packet in buf.<br><br>
*
* It tries to read a full packet with single read call.
* Consecutive packets are usually of the same length.
*/
private void readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read
* full packet with single read and to accept arbitrary size
* for next packet at the same time.
*/
if (buf == null) {
/* initialize buffer to the best guess size:
* 'chunksPerPacket' calculation here should match the same
* calculation in DFSClient to make the guess accurate.
*/
int chunkSize = bytesPerChecksum + checksumSize;
int chunksPerPacket = (datanode.getDnConf().writePacketSize - PacketHeader.PKT_HEADER_LEN
+ chunkSize - 1)/chunkSize;
buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
Math.max(chunksPerPacket, 1) * chunkSize);
buf.limit(0);
}
// See if there is data left in the buffer :
if (bufRead > buf.limit()) {
buf.limit(bufRead);
}
while (buf.remaining() < HdfsConstants.BYTES_IN_INTEGER) {
if (buf.position() > 0) {
shiftBufData();
}
readToBuf(-1);
}
/* We mostly have the full packet or at least enough for an int
*/
buf.mark();
int payloadLen = buf.getInt();
buf.reset();
// check corrupt values for pktLen, 100MB upper limit should be ok?
if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
throw new IOException("Incorrect value for packet payload : " +
payloadLen);
}
// Subtract BYTES_IN_INTEGER since that accounts for the payloadLen that
// we read above.
int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN
- HdfsConstants.BYTES_IN_INTEGER;
if (buf.remaining() < pktSize) {
//we need to read more data
int toRead = pktSize - buf.remaining();
// first make sure buf has enough space.
int spaceLeft = buf.capacity() - buf.limit();
if (toRead > spaceLeft && buf.position() > 0) {
shiftBufData();
spaceLeft = buf.capacity() - buf.limit();
}
if (toRead > spaceLeft) {
byte oldBuf[] = buf.array();
int toCopy = buf.limit();
buf = ByteBuffer.allocate(toCopy + toRead);
System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
buf.limit(toCopy);
}
//now read:
while (toRead > 0) {
toRead -= readToBuf(toRead);
}
}
if (buf.remaining() > pktSize) {
buf.limit(buf.position() + pktSize);
}
if (pktSize > maxPacketReadLen) {
maxPacketReadLen = pktSize;
}
}
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
*/
private int receivePacket() throws IOException {
// read the next packet
readNextPacket();
packetReceiver.receiveNextPacket(in);
buf.mark();
PacketHeader header = new PacketHeader();
header.readFields(buf);
int endOfHeader = buf.position();
buf.reset();
PacketHeader header = packetReceiver.getHeader();
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
": " + header);
}
// Sanity check the header
if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
@ -574,38 +432,12 @@ class BlockReceiver implements Closeable {
header.getDataLen());
}
return receivePacket(
header.getOffsetInBlock(),
header.getSeqno(),
header.isLastPacketInBlock(),
header.getDataLen(),
header.getSyncBlock(),
endOfHeader);
}
long offsetInBlock = header.getOffsetInBlock();
long seqno = header.getSeqno();
boolean lastPacketInBlock = header.isLastPacketInBlock();
int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock();
/**
* Write the received packet to disk (data only)
*/
private void writePacketToDisk(byte[] pktBuf, int startByteToDisk,
int numBytesToDisk) throws IOException {
out.write(pktBuf, startByteToDisk, numBytesToDisk);
}
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
*/
private int receivePacket(long offsetInBlock, long seqno,
boolean lastPacketInBlock, int len, boolean syncBlock,
int endOfHeader) throws IOException {
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
" of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
" syncBlock " + syncBlock +
" lastPacketInBlock " + lastPacketInBlock);
}
// make sure the block gets sync'ed upon close
this.syncOnClose |= syncBlock && lastPacketInBlock;
@ -625,14 +457,15 @@ class BlockReceiver implements Closeable {
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
} catch (IOException e) {
handleMirrorOutError(e);
}
}
buf.position(endOfHeader);
ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
if (lastPacketInBlock || len == 0) {
if(LOG.isDebugEnabled()) {
@ -646,18 +479,11 @@ class BlockReceiver implements Closeable {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
if ( buf.remaining() != (checksumLen + len)) {
throw new IOException("Data remaining in packet does not match" +
"sum of checksumLen and dataLen " +
" size remaining: " + buf.remaining() +
" data len: " + len +
" checksum Len: " + checksumLen);
}
int checksumOff = buf.position();
int dataOff = checksumOff + checksumLen;
byte pktBuf[] = buf.array();
buf.position(buf.limit()); // move to the end of the data.
if ( checksumBuf.capacity() != checksumLen) {
throw new IOException("Length of checksums in packet " +
checksumBuf.capacity() + " does not match calculated checksum " +
"length " + checksumLen);
}
/* skip verifying checksum iff this is not the last one in the
* pipeline and clientName is non-null. i.e. Checksum is verified
@ -667,11 +493,11 @@ class BlockReceiver implements Closeable {
* checksum.
*/
if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
verifyChunks(dataBuf, checksumBuf);
if (needsChecksumTranslation) {
// overwrite the checksums in the packet buffer with the
// appropriate polynomial for the disk storage.
translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
translateChunks(dataBuf, checksumBuf);
}
}
@ -700,9 +526,13 @@ class BlockReceiver implements Closeable {
computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
}
int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position();
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
// Write data to disk.
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
@ -714,7 +544,7 @@ class BlockReceiver implements Closeable {
" len = " + len +
" bytesPerChecksum " + bytesPerChecksum);
}
partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
lastChunkChecksum = Arrays.copyOfRange(
buf, buf.length - checksumSize, buf.length
@ -726,11 +556,12 @@ class BlockReceiver implements Closeable {
partialCrc = null;
} else {
lastChunkChecksum = Arrays.copyOfRange(
pktBuf,
checksumOff + checksumLen - checksumSize,
checksumOff + checksumLen
);
checksumOut.write(pktBuf, checksumOff, checksumLen);
checksumBuf.array(),
checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
checksumOut.write(checksumBuf.array(),
checksumBuf.arrayOffset() + checksumBuf.position(),
checksumLen);
}
/// flush entire packet, sync unless close() will sync
flushOrSync(syncBlock && !lastPacketInBlock);

View File

@ -62,40 +62,29 @@ import org.apache.hadoop.util.DataChecksum;
* </pre>
* An empty packet is sent to mark the end of block and read completion.
*
* PACKET Contains a packet header, checksum and data. Amount of data
* carried is set by BUFFER_SIZE.
* <pre>
* +-----------------------------------------------------+
* | 4 byte packet length (excluding packet header) |
* +-----------------------------------------------------+
* | 8 byte offset in the block | 8 byte sequence number |
* +-----------------------------------------------------+
* | 1 byte isLastPacketInBlock |
* +-----------------------------------------------------+
* | 4 byte Length of actual data |
* +-----------------------------------------------------+
* | x byte checksum data. x is defined below |
* +-----------------------------------------------------+
* | actual data ...... |
* +-----------------------------------------------------+
*
* Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
* A checksum is calculated for each chunk.
*
* x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
* CHECKSUM_SIZE
*
* CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
* </pre>
* PACKET Contains a packet header, checksum and data. Amount of data
* carried is set by BUFFER_SIZE.
* <pre>
* +-----------------------------------------------------+
* | Variable length header. See {@link PacketHeader} |
* +-----------------------------------------------------+
* | x byte checksum data. x is defined below |
* +-----------------------------------------------------+
* | actual data ...... |
* +-----------------------------------------------------+
*
* Data is made of Chunks. Each chunk is of length <= BYTES_PER_CHECKSUM.
* A checksum is calculated for each chunk.
*
* x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
* CHECKSUM_SIZE
*
* CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
* </pre>
*
* The client reads data until it receives a packet with
* "LastPacketInBlock" set to true or with a zero length. If there is
* no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
* <pre>
* +------------------------------+
* | 2 byte OP_STATUS_CHECKSUM_OK |
* +------------------------------+
* </pre>
* no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK.
*/
class BlockSender implements java.io.Closeable {
static final Log LOG = DataNode.LOG;
@ -448,8 +437,22 @@ class BlockSender implements java.io.Closeable {
int packetLen = dataLen + checksumDataLen + 4;
boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;
writePacketHeader(pkt, dataLen, packetLen);
// The packet buffer is organized as follows:
// _______HHHHCCCCD?D?D?D?
// ^ ^
// | \ checksumOff
// \ headerOff
// _ padding, since the header is variable-length
// H = header and length prefixes
// C = checksums
// D? = data, if transferTo is false.
int headerLen = writePacketHeader(pkt, dataLen, packetLen);
// Per above, the header doesn't start at the beginning of the
// buffer
int headerOff = pkt.position() - headerLen;
int checksumOff = pkt.position();
byte[] buf = pkt.array();
@ -479,7 +482,8 @@ class BlockSender implements java.io.Closeable {
try {
if (transferTo) {
SocketOutputStream sockOut = (SocketOutputStream)out;
sockOut.write(buf, 0, dataOff); // First write checksum
// First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);
// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
@ -492,7 +496,7 @@ class BlockSender implements java.io.Closeable {
blockInPosition += dataLen;
} else {
// normal transfer
out.write(buf, 0, dataOff + dataLen);
out.write(buf, headerOff, dataOff + dataLen - headerOff);
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException) {
@ -625,7 +629,7 @@ class BlockSender implements java.io.Closeable {
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try {
int maxChunksPerPacket;
int pktSize = PacketHeader.PKT_HEADER_LEN;
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream;
@ -636,15 +640,15 @@ class BlockSender implements java.io.Closeable {
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
// Smaller packet size to only hold checksum when doing transferTo
pktSize += checksumSize * maxChunksPerPacket;
pktBufSize += checksumSize * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
numberOfChunks(HdfsConstants.IO_FILE_BUFFER_SIZE));
// Packet size includes both checksum and data
pktSize += (chunkSize + checksumSize) * maxChunksPerPacket;
pktBufSize += (chunkSize + checksumSize) * maxChunksPerPacket;
}
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
while (endOffset > offset) {
manageOsCache();
@ -714,14 +718,19 @@ class BlockSender implements java.io.Closeable {
}
/**
* Write packet header into {@code pkt}
* Write packet header into {@code pkt},
* return the length of the header written.
*/
private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
private int writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear();
// both syncBlock and syncPacket are false
PacketHeader header = new PacketHeader(packetLen, offset, seqno,
(dataLen == 0), dataLen, false);
int size = header.getSerializedSize();
pkt.position(PacketHeader.PKT_MAX_HEADER_LEN - size);
header.putInBuffer(pkt);
return size;
}
boolean didSendEntireByteRange() {