HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch. (vinayakumarb)

(cherry picked from commit 1c13519e1e)
This commit is contained in:
Andrew Wang 2015-06-18 08:48:09 -07:00
parent 3b9698ecac
commit 7f62e41b2e
3 changed files with 41 additions and 30 deletions

View File

@ -309,6 +309,9 @@ Release 2.8.0 - UNRELEASED
HDFS-6249. Output AclEntry in PBImageXmlWriter. HDFS-6249. Output AclEntry in PBImageXmlWriter.
(surendra singh lilhore via aajisaka) (surendra singh lilhore via aajisaka)
HDFS-8605. Merge Refactor of DFSOutputStream from HDFS-7285 branch.
(vinayakumarb via wang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -64,6 +64,8 @@ import org.apache.hadoop.util.Time;
import org.apache.htrace.Sampler; import org.apache.htrace.Sampler;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -86,6 +88,7 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private @InterfaceAudience.Private
public class DFSOutputStream extends FSOutputSummer public class DFSOutputStream extends FSOutputSummer
implements Syncable, CanSetDropBehind { implements Syncable, CanSetDropBehind {
static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
/** /**
* Number of times to retry creating a file when there are transient * Number of times to retry creating a file when there are transient
* errors (typically related to encryption zones and KeyProvider operations). * errors (typically related to encryption zones and KeyProvider operations).
@ -413,23 +416,32 @@ public class DFSOutputStream extends FSOutputSummer
// //
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) { getStreamer().getBytesCurBlock() == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) { enqueueCurrentPacketFull();
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", bytesCurBlock=" + getStreamer().getBytesCurBlock() +
", blockSize=" + blockSize +
", appendChunk=" + getStreamer().getAppendChunk());
}
getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
adjustChunkBoundary();
endBlock();
} }
} }
void enqueueCurrentPacket() throws IOException {
getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
}
void enqueueCurrentPacketFull() throws IOException {
LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+ " appendChunk={}, {}", currentPacket, src, getStreamer()
.getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
getStreamer());
enqueueCurrentPacket();
adjustChunkBoundary();
endBlock();
}
/** create an empty packet to mark the end of the block. */
void setCurrentPacketToEmpty() throws InterruptedIOException {
currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
}
/** /**
* If the reopened file did not end at chunk boundary and the above * If the reopened file did not end at chunk boundary and the above
* write filled up its partial chunk. Tell the summer to generate full * write filled up its partial chunk. Tell the summer to generate full
@ -457,11 +469,8 @@ public class DFSOutputStream extends FSOutputSummer
*/ */
protected void endBlock() throws IOException { protected void endBlock() throws IOException {
if (getStreamer().getBytesCurBlock() == blockSize) { if (getStreamer().getBytesCurBlock() == blockSize) {
currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), setCurrentPacketToEmpty();
getStreamer().getAndIncCurrentSeqno(), true); enqueueCurrentPacket();
currentPacket.setSyncBlock(shouldSyncBlock);
getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
getStreamer().setBytesCurBlock(0); getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0; lastFlushOffset = 0;
} }
@ -591,8 +600,7 @@ public class DFSOutputStream extends FSOutputSummer
} }
if (currentPacket != null) { if (currentPacket != null) {
currentPacket.setSyncBlock(isSync); currentPacket.setSyncBlock(isSync);
getStreamer().waitAndQueuePacket(currentPacket); enqueueCurrentPacket();
currentPacket = null;
} }
if (endBlock && getStreamer().getBytesCurBlock() > 0) { if (endBlock && getStreamer().getBytesCurBlock() > 0) {
// Need to end the current block, thus send an empty packet to // Need to end the current block, thus send an empty packet to
@ -600,8 +608,7 @@ public class DFSOutputStream extends FSOutputSummer
currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
getStreamer().getAndIncCurrentSeqno(), true); getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock || isSync); currentPacket.setSyncBlock(shouldSyncBlock || isSync);
getStreamer().waitAndQueuePacket(currentPacket); enqueueCurrentPacket();
currentPacket = null;
getStreamer().setBytesCurBlock(0); getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0; lastFlushOffset = 0;
} else { } else {
@ -780,15 +787,11 @@ public class DFSOutputStream extends FSOutputSummer
flushBuffer(); // flush from all upper layers flushBuffer(); // flush from all upper layers
if (currentPacket != null) { if (currentPacket != null) {
getStreamer().waitAndQueuePacket(currentPacket); enqueueCurrentPacket();
currentPacket = null;
} }
if (getStreamer().getBytesCurBlock() != 0) { if (getStreamer().getBytesCurBlock() != 0) {
// send an empty packet to mark the end of the block setCurrentPacketToEmpty();
currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
} }
flushInternal(); // flush all data to Datanodes flushInternal(); // flush all data to Datanodes

View File

@ -44,7 +44,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@ -1901,4 +1900,10 @@ class DataStreamer extends Daemon {
s.close(); s.close();
} }
} }
@Override
public String toString() {
return (block == null? null: block.getLocalBlock())
+ "@" + Arrays.toString(getNodes());
}
} }