HDFS-8386. Improve synchronization of 'streamer' reference in DFSOutputStream. Contributed by Rakesh R.

(cherry picked from commit efc510a570)

 Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
This commit is contained in:
Andrew Wang 2015-06-02 15:39:24 -07:00
parent 17e369511d
commit 379ece15fe
2 changed files with 93 additions and 71 deletions

View File

@ -251,6 +251,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8489. Subclass BlockInfo to represent contiguous blocks. HDFS-8489. Subclass BlockInfo to represent contiguous blocks.
(Zhe Zhang via jing9) (Zhe Zhang via jing9)
HDFS-8386. Improve synchronization of 'streamer' reference in
DFSOutputStream. (Rakesh R via wang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -139,7 +139,7 @@ public class DFSOutputStream extends FSOutputSummer
@Override @Override
protected void checkClosed() throws IOException { protected void checkClosed() throws IOException {
if (isClosed()) { if (isClosed()) {
streamer.getLastException().throwException4Close(); getStreamer().getLastException().throwException4Close();
} }
} }
@ -148,10 +148,10 @@ public class DFSOutputStream extends FSOutputSummer
// //
@VisibleForTesting @VisibleForTesting
public synchronized DatanodeInfo[] getPipeline() { public synchronized DatanodeInfo[] getPipeline() {
if (streamer.streamerClosed()) { if (getStreamer().streamerClosed()) {
return null; return null;
} }
DatanodeInfo[] currentNodes = streamer.getNodes(); DatanodeInfo[] currentNodes = getStreamer().getNodes();
if (currentNodes == null) { if (currentNodes == null) {
return null; return null;
} }
@ -293,9 +293,9 @@ public class DFSOutputStream extends FSOutputSummer
// indicate that we are appending to an existing block // indicate that we are appending to an existing block
streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum, streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
cachingStrategy, byteArrayManager); cachingStrategy, byteArrayManager);
streamer.setBytesCurBlock(lastBlock.getBlockSize()); getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
adjustPacketChunkSize(stat); adjustPacketChunkSize(stat);
streamer.setPipelineInConstruction(lastBlock); getStreamer().setPipelineInConstruction(lastBlock);
} else { } else {
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum); bytesPerChecksum);
@ -329,7 +329,7 @@ public class DFSOutputStream extends FSOutputSummer
// //
computePacketChunkSize(0, freeInCksum); computePacketChunkSize(0, freeInCksum);
setChecksumBufSize(freeInCksum); setChecksumBufSize(freeInCksum);
streamer.setAppendChunk(true); getStreamer().setAppendChunk(true);
} else { } else {
// if the remaining space in the block is smaller than // if the remaining space in the block is smaller than
// that expected size of of a packet, then create // that expected size of of a packet, then create
@ -392,36 +392,36 @@ public class DFSOutputStream extends FSOutputSummer
} }
if (currentPacket == null) { if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket, currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.getSeqno() + currentPacket.getSeqno() +
", src=" + src + ", src=" + src +
", packetSize=" + packetSize + ", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket + ", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + streamer.getBytesCurBlock()); ", bytesCurBlock=" + getStreamer().getBytesCurBlock());
} }
} }
currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len); currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks(); currentPacket.incNumChunks();
streamer.incBytesCurBlock(len); getStreamer().incBytesCurBlock(len);
// If packet is full, enqueue it for transmission // If packet is full, enqueue it for transmission
// //
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
streamer.getBytesCurBlock() == blockSize) { getStreamer().getBytesCurBlock() == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.getSeqno() + currentPacket.getSeqno() +
", src=" + src + ", src=" + src +
", bytesCurBlock=" + streamer.getBytesCurBlock() + ", bytesCurBlock=" + getStreamer().getBytesCurBlock() +
", blockSize=" + blockSize + ", blockSize=" + blockSize +
", appendChunk=" + streamer.getAppendChunk()); ", appendChunk=" + getStreamer().getAppendChunk());
} }
streamer.waitAndQueuePacket(currentPacket); getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null; currentPacket = null;
adjustChunkBoundary(); adjustChunkBoundary();
@ -436,14 +436,14 @@ public class DFSOutputStream extends FSOutputSummer
* crc chunks from now on. * crc chunks from now on.
*/ */
protected void adjustChunkBoundary() { protected void adjustChunkBoundary() {
if (streamer.getAppendChunk() && if (getStreamer().getAppendChunk() &&
streamer.getBytesCurBlock() % bytesPerChecksum == 0) { getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) {
streamer.setAppendChunk(false); getStreamer().setAppendChunk(false);
resetChecksumBufSize(); resetChecksumBufSize();
} }
if (!streamer.getAppendChunk()) { if (!getStreamer().getAppendChunk()) {
int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()), int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
dfsClient.getConf().getWritePacketSize()); dfsClient.getConf().getWritePacketSize());
computePacketChunkSize(psize, bytesPerChecksum); computePacketChunkSize(psize, bytesPerChecksum);
} }
@ -456,13 +456,13 @@ public class DFSOutputStream extends FSOutputSummer
* @throws IOException * @throws IOException
*/ */
protected void endBlock() throws IOException { protected void endBlock() throws IOException {
if (streamer.getBytesCurBlock() == blockSize) { if (getStreamer().getBytesCurBlock() == blockSize) {
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true); getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock); currentPacket.setSyncBlock(shouldSyncBlock);
streamer.waitAndQueuePacket(currentPacket); getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null; currentPacket = null;
streamer.setBytesCurBlock(0); getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0; lastFlushOffset = 0;
} }
} }
@ -555,31 +555,34 @@ public class DFSOutputStream extends FSOutputSummer
// bytesCurBlock potentially incremented if there was buffered data // bytesCurBlock potentially incremented if there was buffered data
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient flush():" DFSClient.LOG.debug("DFSClient flush(): "
+ " bytesCurBlock=" + streamer.getBytesCurBlock() + " bytesCurBlock=" + getStreamer().getBytesCurBlock()
+ " lastFlushOffset=" + lastFlushOffset + " lastFlushOffset=" + lastFlushOffset
+ " createNewBlock=" + endBlock); + " createNewBlock=" + endBlock);
} }
// Flush only if we haven't already flushed till this offset. // Flush only if we haven't already flushed till this offset.
if (lastFlushOffset != streamer.getBytesCurBlock()) { if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
assert streamer.getBytesCurBlock() > lastFlushOffset; assert getStreamer().getBytesCurBlock() > lastFlushOffset;
// record the valid offset of this flush // record the valid offset of this flush
lastFlushOffset = streamer.getBytesCurBlock(); lastFlushOffset = getStreamer().getBytesCurBlock();
if (isSync && currentPacket == null && !endBlock) { if (isSync && currentPacket == null && !endBlock) {
// Nothing to send right now, // Nothing to send right now,
// but sync was requested. // but sync was requested.
// Send an empty packet if we do not end the block right now // Send an empty packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket, currentPacket = createPacket(packetSize, chunksPerPacket,
streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); getStreamer().getBytesCurBlock(), getStreamer()
.getAndIncCurrentSeqno(), false);
} }
} else { } else {
if (isSync && streamer.getBytesCurBlock() > 0 && !endBlock) { if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) {
// Nothing to send right now, // Nothing to send right now,
// and the block was partially written, // and the block was partially written,
// and sync was requested. // and sync was requested.
// So send an empty sync packet if we do not end the block right now // So send an empty sync packet if we do not end the block right
// now
currentPacket = createPacket(packetSize, chunksPerPacket, currentPacket = createPacket(packetSize, chunksPerPacket,
streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); getStreamer().getBytesCurBlock(), getStreamer()
.getAndIncCurrentSeqno(), false);
} else if (currentPacket != null) { } else if (currentPacket != null) {
// just discard the current packet since it is already been sent. // just discard the current packet since it is already been sent.
currentPacket.releaseBuffer(byteArrayManager); currentPacket.releaseBuffer(byteArrayManager);
@ -588,42 +591,44 @@ public class DFSOutputStream extends FSOutputSummer
} }
if (currentPacket != null) { if (currentPacket != null) {
currentPacket.setSyncBlock(isSync); currentPacket.setSyncBlock(isSync);
streamer.waitAndQueuePacket(currentPacket); getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null; currentPacket = null;
} }
if (endBlock && streamer.getBytesCurBlock() > 0) { if (endBlock && getStreamer().getBytesCurBlock() > 0) {
// Need to end the current block, thus send an empty packet to // Need to end the current block, thus send an empty packet to
// indicate this is the end of the block and reset bytesCurBlock // indicate this is the end of the block and reset bytesCurBlock
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true); getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock || isSync); currentPacket.setSyncBlock(shouldSyncBlock || isSync);
streamer.waitAndQueuePacket(currentPacket); getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null; currentPacket = null;
streamer.setBytesCurBlock(0); getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0; lastFlushOffset = 0;
} else { } else {
// Restore state of stream. Record the last flush offset // Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed. // of the last full chunk that was flushed.
streamer.setBytesCurBlock(streamer.getBytesCurBlock() - numKept); getStreamer().setBytesCurBlock(
getStreamer().getBytesCurBlock() - numKept);
} }
toWaitFor = streamer.getLastQueuedSeqno(); toWaitFor = getStreamer().getLastQueuedSeqno();
} // end synchronized } // end synchronized
streamer.waitForAckedSeqno(toWaitFor); getStreamer().waitForAckedSeqno(toWaitFor);
// update the block length first time irrespective of flag // update the block length first time irrespective of flag
if (updateLength || streamer.getPersistBlocks().get()) { if (updateLength || getStreamer().getPersistBlocks().get()) {
synchronized (this) { synchronized (this) {
if (!streamer.streamerClosed() && streamer.getBlock() != null) { if (!getStreamer().streamerClosed()
lastBlockLength = streamer.getBlock().getNumBytes(); && getStreamer().getBlock() != null) {
lastBlockLength = getStreamer().getBlock().getNumBytes();
} }
} }
} }
// If 1) any new blocks were allocated since the last flush, or 2) to // If 1) any new blocks were allocated since the last flush, or 2) to
// update length in NN is required, then persist block locations on // update length in NN is required, then persist block locations on
// namenode. // namenode.
if (streamer.getPersistBlocks().getAndSet(false) || updateLength) { if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {
try { try {
dfsClient.namenode.fsync(src, fileId, dfsClient.clientName, dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
lastBlockLength); lastBlockLength);
@ -640,8 +645,8 @@ public class DFSOutputStream extends FSOutputSummer
} }
synchronized(this) { synchronized(this) {
if (!streamer.streamerClosed()) { if (!getStreamer().streamerClosed()) {
streamer.setHflush(); getStreamer().setHflush();
} }
} }
} catch (InterruptedIOException interrupt) { } catch (InterruptedIOException interrupt) {
@ -653,7 +658,7 @@ public class DFSOutputStream extends FSOutputSummer
DFSClient.LOG.warn("Error while syncing", e); DFSClient.LOG.warn("Error while syncing", e);
synchronized (this) { synchronized (this) {
if (!isClosed()) { if (!isClosed()) {
streamer.getLastException().set(e); getStreamer().getLastException().set(e);
closeThreads(true); closeThreads(true);
} }
} }
@ -678,10 +683,10 @@ public class DFSOutputStream extends FSOutputSummer
public synchronized int getCurrentBlockReplication() throws IOException { public synchronized int getCurrentBlockReplication() throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
checkClosed(); checkClosed();
if (streamer.streamerClosed()) { if (getStreamer().streamerClosed()) {
return blockReplication; // no pipeline, return repl factor of file return blockReplication; // no pipeline, return repl factor of file
} }
DatanodeInfo[] currentNodes = streamer.getNodes(); DatanodeInfo[] currentNodes = getStreamer().getNodes();
if (currentNodes == null) { if (currentNodes == null) {
return blockReplication; // no pipeline, return repl factor of file return blockReplication; // no pipeline, return repl factor of file
} }
@ -700,16 +705,16 @@ public class DFSOutputStream extends FSOutputSummer
// //
// If there is data in the current buffer, send it across // If there is data in the current buffer, send it across
// //
streamer.queuePacket(currentPacket); getStreamer().queuePacket(currentPacket);
currentPacket = null; currentPacket = null;
toWaitFor = streamer.getLastQueuedSeqno(); toWaitFor = getStreamer().getLastQueuedSeqno();
} }
streamer.waitForAckedSeqno(toWaitFor); getStreamer().waitForAckedSeqno(toWaitFor);
} }
protected synchronized void start() { protected synchronized void start() {
streamer.start(); getStreamer().start();
} }
/** /**
@ -720,32 +725,32 @@ public class DFSOutputStream extends FSOutputSummer
if (isClosed()) { if (isClosed()) {
return; return;
} }
streamer.getLastException().set(new IOException("Lease timeout of " getStreamer().getLastException().set(new IOException("Lease timeout of "
+ (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired.")); + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
closeThreads(true); closeThreads(true);
dfsClient.endFileLease(fileId); dfsClient.endFileLease(fileId);
} }
boolean isClosed() { boolean isClosed() {
return closed || streamer.streamerClosed(); return closed || getStreamer().streamerClosed();
} }
void setClosed() { void setClosed() {
closed = true; closed = true;
streamer.release(); getStreamer().release();
} }
// shutdown datastreamer and responseprocessor threads. // shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true // interrupt datastreamer if force is true
protected void closeThreads(boolean force) throws IOException { protected void closeThreads(boolean force) throws IOException {
try { try {
streamer.close(force); getStreamer().close(force);
streamer.join(); getStreamer().join();
streamer.closeSocket(); getStreamer().closeSocket();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Failed to shutdown streamer"); throw new IOException("Failed to shutdown streamer");
} finally { } finally {
streamer.setSocketToNull(); getStreamer().setSocketToNull();
setClosed(); setClosed();
} }
} }
@ -767,7 +772,7 @@ public class DFSOutputStream extends FSOutputSummer
protected synchronized void closeImpl() throws IOException { protected synchronized void closeImpl() throws IOException {
if (isClosed()) { if (isClosed()) {
streamer.getLastException().check(true); getStreamer().getLastException().check(true);
return; return;
} }
@ -775,20 +780,20 @@ public class DFSOutputStream extends FSOutputSummer
flushBuffer(); // flush from all upper layers flushBuffer(); // flush from all upper layers
if (currentPacket != null) { if (currentPacket != null) {
streamer.waitAndQueuePacket(currentPacket); getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null; currentPacket = null;
} }
if (streamer.getBytesCurBlock() != 0) { if (getStreamer().getBytesCurBlock() != 0) {
// send an empty packet to mark the end of the block // send an empty packet to mark the end of the block
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true); getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock); currentPacket.setSyncBlock(shouldSyncBlock);
} }
flushInternal(); // flush all data to Datanodes flushInternal(); // flush all data to Datanodes
// get last block before destroying the streamer // get last block before destroying the streamer
ExtendedBlock lastBlock = streamer.getBlock(); ExtendedBlock lastBlock = getStreamer().getBlock();
closeThreads(false); closeThreads(false);
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try { try {
@ -846,7 +851,7 @@ public class DFSOutputStream extends FSOutputSummer
@VisibleForTesting @VisibleForTesting
public void setArtificialSlowdown(long period) { public void setArtificialSlowdown(long period) {
streamer.setArtificialSlowdown(period); getStreamer().setArtificialSlowdown(period);
} }
@VisibleForTesting @VisibleForTesting
@ -873,7 +878,7 @@ public class DFSOutputStream extends FSOutputSummer
* Returns the access token currently used by streamer, for testing only * Returns the access token currently used by streamer, for testing only
*/ */
synchronized Token<BlockTokenIdentifier> getBlockToken() { synchronized Token<BlockTokenIdentifier> getBlockToken() {
return streamer.getBlockToken(); return getStreamer().getBlockToken();
} }
@Override @Override
@ -890,11 +895,25 @@ public class DFSOutputStream extends FSOutputSummer
@VisibleForTesting @VisibleForTesting
ExtendedBlock getBlock() { ExtendedBlock getBlock() {
return streamer.getBlock(); return getStreamer().getBlock();
} }
@VisibleForTesting @VisibleForTesting
public long getFileId() { public long getFileId() {
return fileId; return fileId;
} }
/**
* Set the data streamer object.
*/
protected synchronized void setStreamer(DataStreamer streamer) {
this.streamer = streamer;
}
/**
* Returns the data streamer object.
*/
protected synchronized DataStreamer getStreamer() {
return streamer;
}
} }