From c5d52c7b52ebdaebc829d4b70e1dff3df5d37a30 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Thu, 5 Mar 2015 10:57:48 -0800 Subject: [PATCH] HDFS-7855. Separate class Packet from DFSOutputStream. Contributed by Li Bo. (cherry picked from commit 952640fa4cbdc23fe8781e5627c2e8eab565c535) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/hdfs/DFSOutputStream.java | 239 +++------------- .../org/apache/hadoop/hdfs/DFSPacket.java | 270 ++++++++++++++++++ .../org/apache/hadoop/hdfs/TestDFSPacket.java | 68 +++++ 4 files changed, 381 insertions(+), 198 deletions(-) mode change 100644 => 100755 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java create mode 100755 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java create mode 100755 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4491d8ada12..187be3b5615 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -410,6 +410,8 @@ Release 2.7.0 - UNRELEASED HADOOP-11648. Set DomainSocketWatcher thread name explicitly. (Liang Xie via ozawa) + HDFS-7855. Separate class Packet from DFSOutputStream. (Li Bo bia jing9) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java old mode 100644 new mode 100755 index 60a9b3755e8..0a8720a4bdf --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -30,7 +30,6 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; -import java.nio.BufferOverflowException; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; @@ -79,7 +78,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; @@ -160,9 +158,9 @@ public class DFSOutputStream extends FSOutputSummer private final int bytesPerChecksum; // both dataQueue and ackQueue are protected by dataQueue lock - private final LinkedList dataQueue = new LinkedList(); - private final LinkedList ackQueue = new LinkedList(); - private Packet currentPacket = null; + private final LinkedList dataQueue = new LinkedList(); + private final LinkedList ackQueue = new LinkedList(); + private DFSPacket currentPacket = null; private DataStreamer streamer; private long currentSeqno = 0; private long lastQueuedSeqno = -1; @@ -187,8 +185,8 @@ public class DFSOutputStream extends FSOutputSummer BlockStoragePolicySuite.createDefaultSuite(); /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ - private Packet createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, - long seqno) throws InterruptedIOException { + private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, + long seqno, boolean lastPacketInBlock) throws InterruptedIOException { final byte[] buf; final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize; @@ -201,170 +199,18 @@ public class DFSOutputStream extends FSOutputSummer throw iioe; } - return new Packet(buf, chunksPerPkt, offsetInBlock, seqno, getChecksumSize()); + return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno, + getChecksumSize(), lastPacketInBlock); } /** * For heartbeat packets, create buffer directly by new byte[] * since heartbeats should not be blocked. */ - private Packet createHeartbeatPacket() throws InterruptedIOException { + private DFSPacket createHeartbeatPacket() throws InterruptedIOException { final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; - return new Packet(buf, 0, 0, Packet.HEART_BEAT_SEQNO, getChecksumSize()); - } - - private static class Packet { - private static final long HEART_BEAT_SEQNO = -1L; - long seqno; // sequencenumber of buffer in block - final long offsetInBlock; // offset in block - boolean syncBlock; // this packet forces the current block to disk - int numChunks; // number of chunks currently in packet - final int maxChunks; // max chunks in packet - private byte[] buf; - private boolean lastPacketInBlock; // is this the last packet in block? - - /** - * buf is pointed into like follows: - * (C is checksum data, D is payload data) - * - * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___] - * ^ ^ ^ ^ - * | checksumPos dataStart dataPos - * checksumStart - * - * Right before sending, we move the checksum data to immediately precede - * the actual data, and then insert the header into the buffer immediately - * preceding the checksum data, so we make sure to keep enough space in - * front of the checksum data to support the largest conceivable header. - */ - int checksumStart; - int checksumPos; - final int dataStart; - int dataPos; - - /** - * Create a new packet. - * - * @param chunksPerPkt maximum number of chunks per packet. - * @param offsetInBlock offset in bytes into the HDFS block. - */ - private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, - int checksumSize) { - this.lastPacketInBlock = false; - this.numChunks = 0; - this.offsetInBlock = offsetInBlock; - this.seqno = seqno; - - this.buf = buf; - - checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; - checksumPos = checksumStart; - dataStart = checksumStart + (chunksPerPkt * checksumSize); - dataPos = dataStart; - maxChunks = chunksPerPkt; - } - - synchronized void writeData(byte[] inarray, int off, int len) - throws ClosedChannelException { - checkBuffer(); - if (dataPos + len > buf.length) { - throw new BufferOverflowException(); - } - System.arraycopy(inarray, off, buf, dataPos, len); - dataPos += len; - } - - synchronized void writeChecksum(byte[] inarray, int off, int len) - throws ClosedChannelException { - checkBuffer(); - if (len == 0) { - return; - } - if (checksumPos + len > dataStart) { - throw new BufferOverflowException(); - } - System.arraycopy(inarray, off, buf, checksumPos, len); - checksumPos += len; - } - - /** - * Write the full packet, including the header, to the given output stream. - */ - synchronized void writeTo(DataOutputStream stm) throws IOException { - checkBuffer(); - - final int dataLen = dataPos - dataStart; - final int checksumLen = checksumPos - checksumStart; - final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; - - PacketHeader header = new PacketHeader( - pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); - - if (checksumPos != dataStart) { - // Move the checksum to cover the gap. This can happen for the last - // packet or during an hflush/hsync call. - System.arraycopy(buf, checksumStart, buf, - dataStart - checksumLen , checksumLen); - checksumPos = dataStart; - checksumStart = checksumPos - checksumLen; - } - - final int headerStart = checksumStart - header.getSerializedSize(); - assert checksumStart + 1 >= header.getSerializedSize(); - assert checksumPos == dataStart; - assert headerStart >= 0; - assert headerStart + header.getSerializedSize() == checksumStart; - - // Copy the header data into the buffer immediately preceding the checksum - // data. - System.arraycopy(header.getBytes(), 0, buf, headerStart, - header.getSerializedSize()); - - // corrupt the data for testing. - if (DFSClientFaultInjector.get().corruptPacket()) { - buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; - } - - // Write the now contiguous full packet to the output stream. - stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen); - - // undo corruption. - if (DFSClientFaultInjector.get().uncorruptPacket()) { - buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; - } - } - - private synchronized void checkBuffer() throws ClosedChannelException { - if (buf == null) { - throw new ClosedChannelException(); - } - } - - private synchronized void releaseBuffer(ByteArrayManager bam) { - bam.release(buf); - buf = null; - } - - // get the packet's last byte's offset in the block - synchronized long getLastByteOffsetBlock() { - return offsetInBlock + dataPos - dataStart; - } - - /** - * Check if this packet is a heart beat packet - * @return true if the sequence number is HEART_BEAT_SEQNO - */ - private boolean isHeartbeatPacket() { - return seqno == HEART_BEAT_SEQNO; - } - - @Override - public String toString() { - return "packet seqno: " + this.seqno + - " offsetInBlock: " + this.offsetInBlock + - " lastPacketInBlock: " + this.lastPacketInBlock + - " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock(); - } + return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, + getChecksumSize(), false); } // @@ -556,7 +402,7 @@ public class DFSOutputStream extends FSOutputSummer } } - Packet one; + DFSPacket one; try { // process datanode IO errors if any boolean doSleep = false; @@ -620,7 +466,7 @@ public class DFSOutputStream extends FSOutputSummer " Aborting file " + src); } - if (one.lastPacketInBlock) { + if (one.isLastPacketInBlock()) { // wait for all data packets have been successfully acked synchronized (dataQueue) { while (!streamerClosed && !hasError && @@ -681,7 +527,7 @@ public class DFSOutputStream extends FSOutputSummer } // Is this block full? - if (one.lastPacketInBlock) { + if (one.isLastPacketInBlock()) { // wait for the close packet has been acked synchronized (dataQueue) { while (!streamerClosed && !hasError && @@ -883,7 +729,7 @@ public class DFSOutputStream extends FSOutputSummer ack.readFields(blockReplyStream); long duration = Time.monotonicNow() - begin; if (duration > dfsclientSlowLogThresholdMs - && ack.getSeqno() != Packet.HEART_BEAT_SEQNO) { + && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { DFSClient.LOG .warn("Slow ReadProcessor read fields took " + duration + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " @@ -920,21 +766,21 @@ public class DFSOutputStream extends FSOutputSummer assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack; - if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack + if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack continue; } // a success ack for a data packet - Packet one; + DFSPacket one; synchronized (dataQueue) { one = ackQueue.getFirst(); } - if (one.seqno != seqno) { + if (one.getSeqno() != seqno) { throw new IOException("ResponseProcessor: Expecting seqno " + " for block " + block + - one.seqno + " but received " + seqno); + one.getSeqno() + " but received " + seqno); } - isLastPacketInBlock = one.lastPacketInBlock; + isLastPacketInBlock = one.isLastPacketInBlock(); // Fail the packet write for testing in order to force a // pipeline recovery. @@ -1032,10 +878,10 @@ public class DFSOutputStream extends FSOutputSummer // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that // a client waiting on close() will be aware that the flush finished. synchronized (dataQueue) { - Packet endOfBlockPacket = dataQueue.remove(); // remove the end of block packet - assert endOfBlockPacket.lastPacketInBlock; - assert lastAckedSeqno == endOfBlockPacket.seqno - 1; - lastAckedSeqno = endOfBlockPacket.seqno; + DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet + assert endOfBlockPacket.isLastPacketInBlock(); + assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; + lastAckedSeqno = endOfBlockPacket.getSeqno(); dataQueue.notifyAll(); } endBlock(); @@ -1862,9 +1708,9 @@ public class DFSOutputStream extends FSOutputSummer synchronized (dataQueue) { if (currentPacket == null) return; dataQueue.addLast(currentPacket); - lastQueuedSeqno = currentPacket.seqno; + lastQueuedSeqno = currentPacket.getSeqno(); if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Queued packet " + currentPacket.seqno); + DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno()); } currentPacket = null; dataQueue.notifyAll(); @@ -1916,10 +1762,10 @@ public class DFSOutputStream extends FSOutputSummer if (currentPacket == null) { currentPacket = createPacket(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++); + bytesCurBlock, currentSeqno++, false); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + - currentPacket.seqno + + currentPacket.getSeqno() + ", src=" + src + ", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket + @@ -1929,16 +1775,16 @@ public class DFSOutputStream extends FSOutputSummer currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); - currentPacket.numChunks++; + currentPacket.incNumChunks(); bytesCurBlock += len; // If packet is full, enqueue it for transmission // - if (currentPacket.numChunks == currentPacket.maxChunks || + if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || bytesCurBlock == blockSize) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + - currentPacket.seqno + + currentPacket.getSeqno() + ", src=" + src + ", bytesCurBlock=" + bytesCurBlock + ", blockSize=" + blockSize + @@ -1963,9 +1809,8 @@ public class DFSOutputStream extends FSOutputSummer // indicate the end of block and reset bytesCurBlock. // if (bytesCurBlock == blockSize) { - currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++); - currentPacket.lastPacketInBlock = true; - currentPacket.syncBlock = shouldSyncBlock; + currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); + currentPacket.setSyncBlock(shouldSyncBlock); waitAndQueueCurrentPacket(); bytesCurBlock = 0; lastFlushOffset = 0; @@ -2058,7 +1903,7 @@ public class DFSOutputStream extends FSOutputSummer // but sync was requested. // Send an empty packet if we do not end the block right now currentPacket = createPacket(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++); + bytesCurBlock, currentSeqno++, false); } } else { if (isSync && bytesCurBlock > 0 && !endBlock) { @@ -2067,7 +1912,7 @@ public class DFSOutputStream extends FSOutputSummer // and sync was requested. // So send an empty sync packet if we do not end the block right now currentPacket = createPacket(packetSize, chunksPerPacket, - bytesCurBlock, currentSeqno++); + bytesCurBlock, currentSeqno++, false); } else if (currentPacket != null) { // just discard the current packet since it is already been sent. currentPacket.releaseBuffer(byteArrayManager); @@ -2075,15 +1920,14 @@ public class DFSOutputStream extends FSOutputSummer } } if (currentPacket != null) { - currentPacket.syncBlock = isSync; + currentPacket.setSyncBlock(isSync); waitAndQueueCurrentPacket(); } if (endBlock && bytesCurBlock > 0) { // Need to end the current block, thus send an empty packet to // indicate this is the end of the block and reset bytesCurBlock - currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++); - currentPacket.lastPacketInBlock = true; - currentPacket.syncBlock = shouldSyncBlock || isSync; + currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); + currentPacket.setSyncBlock(shouldSyncBlock || isSync); waitAndQueueCurrentPacket(); bytesCurBlock = 0; lastFlushOffset = 0; @@ -2254,8 +2098,8 @@ public class DFSOutputStream extends FSOutputSummer } } - private static void releaseBuffer(List packets, ByteArrayManager bam) { - for(Packet p : packets) { + private static void releaseBuffer(List packets, ByteArrayManager bam) { + for (DFSPacket p : packets) { p.releaseBuffer(bam); } packets.clear(); @@ -2302,9 +2146,8 @@ public class DFSOutputStream extends FSOutputSummer if (bytesCurBlock != 0) { // send an empty packet to mark the end of the block - currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++); - currentPacket.lastPacketInBlock = true; - currentPacket.syncBlock = shouldSyncBlock; + currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); + currentPacket.setSyncBlock(shouldSyncBlock); } flushInternal(); // flush all data to Datanodes diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java new file mode 100755 index 00000000000..9b3ea515b4f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.channels.ClosedChannelException; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.util.ByteArrayManager; + +/**************************************************************** + * DFSPacket is used by DataStreamer and DFSOutputStream. + * DFSOutputStream generates packets and then ask DatStreamer + * to send them to datanodes. + ****************************************************************/ + +class DFSPacket { + public static final long HEART_BEAT_SEQNO = -1L; + private final long seqno; // sequence number of buffer in block + private final long offsetInBlock; // offset in block + private boolean syncBlock; // this packet forces the current block to disk + private int numChunks; // number of chunks currently in packet + private final int maxChunks; // max chunks in packet + private byte[] buf; + private final boolean lastPacketInBlock; // is this the last packet in block? + + /** + * buf is pointed into like follows: + * (C is checksum data, D is payload data) + * + * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___] + * ^ ^ ^ ^ + * | checksumPos dataStart dataPos + * checksumStart + * + * Right before sending, we move the checksum data to immediately precede + * the actual data, and then insert the header into the buffer immediately + * preceding the checksum data, so we make sure to keep enough space in + * front of the checksum data to support the largest conceivable header. + */ + private int checksumStart; + private int checksumPos; + private final int dataStart; + private int dataPos; + + /** + * Create a new packet. + * + * @param buf the buffer storing data and checksums + * @param chunksPerPkt maximum number of chunks per packet. + * @param offsetInBlock offset in bytes into the HDFS block. + * @param seqno the sequence number of this packet + * @param checksumSize the size of checksum + * @param lastPacketInBlock if this is the last packet + */ + DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno, + int checksumSize, boolean lastPacketInBlock) { + this.lastPacketInBlock = lastPacketInBlock; + this.numChunks = 0; + this.offsetInBlock = offsetInBlock; + this.seqno = seqno; + + this.buf = buf; + + checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; + checksumPos = checksumStart; + dataStart = checksumStart + (chunksPerPkt * checksumSize); + dataPos = dataStart; + maxChunks = chunksPerPkt; + } + + /** + * Write data to this packet. + * + * @param inarray input array of data + * @param off the offset of data to write + * @param len the length of data to write + * @throws ClosedChannelException + */ + synchronized void writeData(byte[] inarray, int off, int len) + throws ClosedChannelException { + checkBuffer(); + if (dataPos + len > buf.length) { + throw new BufferOverflowException(); + } + System.arraycopy(inarray, off, buf, dataPos, len); + dataPos += len; + } + + /** + * Write checksums to this packet + * + * @param inarray input array of checksums + * @param off the offset of checksums to write + * @param len the length of checksums to write + * @throws ClosedChannelException + */ + synchronized void writeChecksum(byte[] inarray, int off, int len) + throws ClosedChannelException { + checkBuffer(); + if (len == 0) { + return; + } + if (checksumPos + len > dataStart) { + throw new BufferOverflowException(); + } + System.arraycopy(inarray, off, buf, checksumPos, len); + checksumPos += len; + } + + /** + * Write the full packet, including the header, to the given output stream. + * + * @param stm + * @throws IOException + */ + synchronized void writeTo(DataOutputStream stm) throws IOException { + checkBuffer(); + + final int dataLen = dataPos - dataStart; + final int checksumLen = checksumPos - checksumStart; + final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen; + + PacketHeader header = new PacketHeader( + pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); + + if (checksumPos != dataStart) { + // Move the checksum to cover the gap. This can happen for the last + // packet or during an hflush/hsync call. + System.arraycopy(buf, checksumStart, buf, + dataStart - checksumLen , checksumLen); + checksumPos = dataStart; + checksumStart = checksumPos - checksumLen; + } + + final int headerStart = checksumStart - header.getSerializedSize(); + assert checksumStart + 1 >= header.getSerializedSize(); + assert headerStart >= 0; + assert headerStart + header.getSerializedSize() == checksumStart; + + // Copy the header data into the buffer immediately preceding the checksum + // data. + System.arraycopy(header.getBytes(), 0, buf, headerStart, + header.getSerializedSize()); + + // corrupt the data for testing. + if (DFSClientFaultInjector.get().corruptPacket()) { + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + } + + // Write the now contiguous full packet to the output stream. + stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen); + + // undo corruption. + if (DFSClientFaultInjector.get().uncorruptPacket()) { + buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff; + } + } + + private synchronized void checkBuffer() throws ClosedChannelException { + if (buf == null) { + throw new ClosedChannelException(); + } + } + + /** + * Release the buffer in this packet to ByteArrayManager. + * + * @param bam + */ + synchronized void releaseBuffer(ByteArrayManager bam) { + bam.release(buf); + buf = null; + } + + /** + * get the packet's last byte's offset in the block + * + * @return the packet's last byte's offset in the block + */ + synchronized long getLastByteOffsetBlock() { + return offsetInBlock + dataPos - dataStart; + } + + /** + * Check if this packet is a heart beat packet + * + * @return true if the sequence number is HEART_BEAT_SEQNO + */ + boolean isHeartbeatPacket() { + return seqno == HEART_BEAT_SEQNO; + } + + /** + * check if this packet is the last packet in block + * + * @return true if the packet is the last packet + */ + boolean isLastPacketInBlock(){ + return lastPacketInBlock; + } + + /** + * get sequence number of this packet + * + * @return the sequence number of this packet + */ + long getSeqno(){ + return seqno; + } + + /** + * get the number of chunks this packet contains + * + * @return the number of chunks in this packet + */ + synchronized int getNumChunks(){ + return numChunks; + } + + /** + * increase the number of chunks by one + */ + synchronized void incNumChunks(){ + numChunks++; + } + + /** + * get the maximum number of packets + * + * @return the maximum number of packets + */ + int getMaxChunks(){ + return maxChunks; + } + + /** + * set if to sync block + * + * @param syncBlock if to sync block + */ + synchronized void setSyncBlock(boolean syncBlock){ + this.syncBlock = syncBlock; + } + + @Override + public String toString() { + return "packet seqno: " + this.seqno + + " offsetInBlock: " + this.offsetInBlock + + " lastPacketInBlock: " + this.lastPacketInBlock + + " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java new file mode 100755 index 00000000000..8bf60971b3d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.util.Random; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.io.DataOutputBuffer; +import org.junit.Assert; +import org.junit.Test; + +public class TestDFSPacket { + private static final int chunkSize = 512; + private static final int checksumSize = 4; + private static final int maxChunksPerPacket = 4; + + @Test + public void testPacket() throws Exception { + Random r = new Random(12345L); + byte[] data = new byte[chunkSize]; + r.nextBytes(data); + byte[] checksum = new byte[checksumSize]; + r.nextBytes(checksum); + + DataOutputBuffer os = new DataOutputBuffer(data.length * 2); + + byte[] packetBuf = new byte[data.length * 2]; + DFSPacket p = new DFSPacket(packetBuf, maxChunksPerPacket, + 0, 0, checksumSize, false); + p.setSyncBlock(true); + p.writeData(data, 0, data.length); + p.writeChecksum(checksum, 0, checksum.length); + p.writeTo(os); + + //we have set syncBlock to true, so the header has the maximum length + int headerLen = PacketHeader.PKT_MAX_HEADER_LEN; + byte[] readBuf = os.getData(); + + assertArrayRegionsEqual(readBuf, headerLen, checksum, 0, checksum.length); + assertArrayRegionsEqual(readBuf, headerLen + checksum.length, data, 0, data.length); + + } + + public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2, + int off2, int len) { + for (int i = 0; i < len; i++) { + if (buf1[off1 + i] != buf2[off2 + i]) { + Assert.fail("arrays differ at byte " + i + ". " + + "The first array has " + (int) buf1[off1 + i] + + ", but the second array has " + (int) buf2[off2 + i]); + } + } + } +}