From 357fbc1dd232e1567262951ec7bf2b593c5ee154 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Fri, 11 Jul 2014 22:21:30 +0000 Subject: [PATCH] HDFS-3851. Merging change r1377372 from trunk to branch-2. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1609857 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/hdfs/DFSOutputStream.java | 67 +++++++++---------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 3419784a465..920ef9f4048 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -61,8 +61,8 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; @@ -120,9 +120,9 @@ import com.google.common.cache.RemovalNotification; @InterfaceAudience.Private public class DFSOutputStream extends FSOutputSummer implements Syncable, CanSetDropBehind { - private final DFSClient dfsClient; private final long dfsclientSlowLogThresholdMs; private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB + private final DFSClient dfsClient; private Socket s; // closed is accessed by different threads under different locks. private volatile boolean closed = false; @@ -155,15 +155,15 @@ public class DFSOutputStream extends FSOutputSummer private final AtomicReference cachingStrategy; private boolean failPacket = false; - private class Packet { - final long seqno; // sequencenumber of buffer in block - final long offsetInBlock; // offset in block - private boolean lastPacketInBlock; // is this the last packet in block? - boolean syncBlock; // this packet forces the current block to disk - int numChunks; // number of chunks currently in packet - final int maxChunks; // max chunks in packet - + private static class Packet { + private static final long HEART_BEAT_SEQNO = -1L; + long seqno; // sequencenumber of buffer in block + final long offsetInBlock; // offset in block + boolean syncBlock; // this packet forces the current block to disk + int numChunks; // number of chunks currently in packet + final int maxChunks; // max chunks in packet final byte[] buf; + private boolean lastPacketInBlock; // is this the last packet in block? /** * buf is pointed into like follows: @@ -181,45 +181,36 @@ public class DFSOutputStream extends FSOutputSummer */ int checksumStart; int checksumPos; - int dataStart; + final int dataStart; int dataPos; - private static final long HEART_BEAT_SEQNO = -1L; - /** * Create a heartbeat packet. */ - Packet() { - this.lastPacketInBlock = false; - this.numChunks = 0; - this.offsetInBlock = 0; - this.seqno = HEART_BEAT_SEQNO; - - buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; - - checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN; - maxChunks = 0; + Packet(int checksumSize) { + this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize); } /** * Create a new packet. * - * @param pktSize maximum size of the packet, including checksum data and actual data. + * @param pktSize maximum size of the packet, + * including checksum data and actual data. * @param chunksPerPkt maximum number of chunks per packet. * @param offsetInBlock offset in bytes into the HDFS block. */ - Packet(int pktSize, int chunksPerPkt, long offsetInBlock) { + Packet(int pktSize, int chunksPerPkt, long offsetInBlock, + long seqno, int checksumSize) { this.lastPacketInBlock = false; this.numChunks = 0; this.offsetInBlock = offsetInBlock; - this.seqno = currentSeqno; - currentSeqno++; + this.seqno = seqno; buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize]; checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; checksumPos = checksumStart; - dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize()); + dataStart = checksumStart + (chunksPerPkt * checksumSize); dataPos = dataStart; maxChunks = chunksPerPkt; } @@ -476,6 +467,7 @@ public class DFSOutputStream extends FSOutputSummer response.join(); response = null; } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } } @@ -502,6 +494,7 @@ public class DFSOutputStream extends FSOutputSummer try { dataQueue.wait(timeout); } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; now = Time.now(); @@ -511,7 +504,7 @@ public class DFSOutputStream extends FSOutputSummer } // get packet to be sent. if (dataQueue.isEmpty()) { - one = new Packet(); // heartbeat packet + one = new Packet(checksum.getChecksumSize()); // heartbeat packet } else { one = dataQueue.getFirst(); // regular data packet } @@ -551,6 +544,7 @@ public class DFSOutputStream extends FSOutputSummer // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } } } @@ -675,6 +669,7 @@ public class DFSOutputStream extends FSOutputSummer response.close(); response.join(); } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } finally { response = null; } @@ -1478,6 +1473,7 @@ public class DFSOutputStream extends FSOutputSummer Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { + DFSClient.LOG.warn("Caught exception ", ie); } } } else { @@ -1738,7 +1734,7 @@ public class DFSOutputStream extends FSOutputSummer if (currentPacket == null) { currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock); + bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.seqno + @@ -1785,7 +1781,8 @@ public class DFSOutputStream extends FSOutputSummer // indicate the end of block and reset bytesCurBlock. // if (bytesCurBlock == blockSize) { - currentPacket = new Packet(0, 0, bytesCurBlock); + currentPacket = new Packet(0, 0, bytesCurBlock, + currentSeqno++, this.checksum.getChecksumSize()); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; waitAndQueueCurrentPacket(); @@ -1886,7 +1883,7 @@ public class DFSOutputStream extends FSOutputSummer // but sync was requested. // Send an empty packet currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock); + bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); } } else { // We already flushed up to this offset. @@ -1903,7 +1900,7 @@ public class DFSOutputStream extends FSOutputSummer // and sync was requested. // So send an empty sync packet. currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock); + bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); } else { // just discard the current packet since it is already been sent. currentPacket = null; @@ -2107,7 +2104,8 @@ public class DFSOutputStream extends FSOutputSummer if (bytesCurBlock != 0) { // send an empty packet to mark the end of the block - currentPacket = new Packet(0, 0, bytesCurBlock); + currentPacket = new Packet(0, 0, bytesCurBlock, + currentSeqno++, this.checksum.getChecksumSize()); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; } @@ -2157,6 +2155,7 @@ public class DFSOutputStream extends FSOutputSummer DFSClient.LOG.info("Could not complete " + src + " retrying..."); } } catch (InterruptedException ie) { + DFSClient.LOG.warn("Caught exception ", ie); } } }