From 7f62e41b2eaae3edc0a01fc5d8cdf32ff7ded708 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 18 Jun 2015 08:48:09 -0700 Subject: [PATCH] HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch. (vinayakumarb) (cherry picked from commit 1c13519e1e7588c3e2974138d37bf3449ca8b3df) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/DFSOutputStream.java | 61 ++++++++++--------- .../org/apache/hadoop/hdfs/DataStreamer.java | 7 ++- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 469272a0b2b..001c9b2f874 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -309,6 +309,9 @@ Release 2.8.0 - UNRELEASED HDFS-6249. Output AclEntry in PBImageXmlWriter. (surendra singh lilhore via aajisaka) + HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch. + (vinayakumarb via wang) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 30d6b6b2229..d160b2b5a1d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -64,6 +64,8 @@ import org.apache.hadoop.util.Time; import org.apache.htrace.Sampler; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -86,6 +88,7 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class DFSOutputStream extends FSOutputSummer implements Syncable, CanSetDropBehind { + static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class); /** * Number of times to retry creating a file when there are transient * errors (typically related to encryption zones and KeyProvider operations). @@ -413,23 +416,32 @@ public class DFSOutputStream extends FSOutputSummer // if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || getStreamer().getBytesCurBlock() == blockSize) { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + - currentPacket.getSeqno() + - ", src=" + src + - ", bytesCurBlock=" + getStreamer().getBytesCurBlock() + - ", blockSize=" + blockSize + - ", appendChunk=" + getStreamer().getAppendChunk()); - } - getStreamer().waitAndQueuePacket(currentPacket); - currentPacket = null; - - adjustChunkBoundary(); - - endBlock(); + enqueueCurrentPacketFull(); } } + void enqueueCurrentPacket() throws IOException { + getStreamer().waitAndQueuePacket(currentPacket); + currentPacket = null; + } + + void enqueueCurrentPacketFull() throws IOException { + LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}," + + " appendChunk={}, {}", currentPacket, src, getStreamer() + .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), + getStreamer()); + enqueueCurrentPacket(); + adjustChunkBoundary(); + endBlock(); + } + + /** create an empty packet to mark the end of the block. */ + void setCurrentPacketToEmpty() throws InterruptedIOException { + currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), + getStreamer().getAndIncCurrentSeqno(), true); + currentPacket.setSyncBlock(shouldSyncBlock); + } + /** * If the reopened file did not end at chunk boundary and the above * write filled up its partial chunk. Tell the summer to generate full @@ -457,11 +469,8 @@ public class DFSOutputStream extends FSOutputSummer */ protected void endBlock() throws IOException { if (getStreamer().getBytesCurBlock() == blockSize) { - currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), - getStreamer().getAndIncCurrentSeqno(), true); - currentPacket.setSyncBlock(shouldSyncBlock); - getStreamer().waitAndQueuePacket(currentPacket); - currentPacket = null; + setCurrentPacketToEmpty(); + enqueueCurrentPacket(); getStreamer().setBytesCurBlock(0); lastFlushOffset = 0; } @@ -591,8 +600,7 @@ public class DFSOutputStream extends FSOutputSummer } if (currentPacket != null) { currentPacket.setSyncBlock(isSync); - getStreamer().waitAndQueuePacket(currentPacket); - currentPacket = null; + enqueueCurrentPacket(); } if (endBlock && getStreamer().getBytesCurBlock() > 0) { // Need to end the current block, thus send an empty packet to @@ -600,8 +608,7 @@ public class DFSOutputStream extends FSOutputSummer currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock || isSync); - getStreamer().waitAndQueuePacket(currentPacket); - currentPacket = null; + enqueueCurrentPacket(); getStreamer().setBytesCurBlock(0); lastFlushOffset = 0; } else { @@ -780,15 +787,11 @@ public class DFSOutputStream extends FSOutputSummer flushBuffer(); // flush from all upper layers if (currentPacket != null) { - getStreamer().waitAndQueuePacket(currentPacket); - currentPacket = null; + enqueueCurrentPacket(); } if (getStreamer().getBytesCurBlock() != 0) { - // send an empty packet to mark the end of the block - currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), - getStreamer().getAndIncCurrentSeqno(), true); - currentPacket.setSyncBlock(shouldSyncBlock); + setCurrentPacketToEmpty(); } flushInternal(); // flush all data to Datanodes diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index cecd5a0eccf..8dd85b72375 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -44,7 +44,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; @@ -1901,4 +1900,10 @@ class DataStreamer extends Daemon { s.close(); } } + + @Override + public String toString() { + return (block == null? null: block.getLocalBlock()) + + "@" + Arrays.toString(getNodes()); + } }