From 379ece15feafe793265f7fc0e85da39a8469b32a Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Tue, 2 Jun 2015 15:39:24 -0700 Subject: [PATCH] HDFS-8386. Improve synchronization of 'streamer' reference in DFSOutputStream. Contributed by Rakesh R. (cherry picked from commit efc510a570cf880e7df1b69932aa41932658ee51) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/DFSOutputStream.java | 161 ++++++++++-------- 2 files changed, 93 insertions(+), 71 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 68fd11b082c..cad0e86f3ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -251,6 +251,9 @@ Release 2.8.0 - UNRELEASED HDFS-8489. Subclass BlockInfo to represent contiguous blocks. (Zhe Zhang via jing9) + HDFS-8386. Improve synchronization of 'streamer' reference in + DFSOutputStream. (Rakesh R via wang) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 4925091529b..297329e7154 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -139,7 +139,7 @@ public class DFSOutputStream extends FSOutputSummer @Override protected void checkClosed() throws IOException { if (isClosed()) { - streamer.getLastException().throwException4Close(); + getStreamer().getLastException().throwException4Close(); } } @@ -148,10 +148,10 @@ public class DFSOutputStream extends FSOutputSummer // @VisibleForTesting public synchronized DatanodeInfo[] getPipeline() { - if (streamer.streamerClosed()) { + if (getStreamer().streamerClosed()) { return null; } - DatanodeInfo[] currentNodes = streamer.getNodes(); + DatanodeInfo[] currentNodes = getStreamer().getNodes(); if (currentNodes == null) { return null; } @@ -293,9 +293,9 @@ public class DFSOutputStream extends FSOutputSummer // indicate that we are appending to an existing block streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager); - streamer.setBytesCurBlock(lastBlock.getBlockSize()); + getStreamer().setBytesCurBlock(lastBlock.getBlockSize()); adjustPacketChunkSize(stat); - streamer.setPipelineInConstruction(lastBlock); + getStreamer().setPipelineInConstruction(lastBlock); } else { computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); @@ -329,7 +329,7 @@ public class DFSOutputStream extends FSOutputSummer // computePacketChunkSize(0, freeInCksum); setChecksumBufSize(freeInCksum); - streamer.setAppendChunk(true); + getStreamer().setAppendChunk(true); } else { // if the remaining space in the block is smaller than // that expected size of of a packet, then create @@ -392,36 +392,36 @@ public class DFSOutputStream extends FSOutputSummer } if (currentPacket == null) { - currentPacket = createPacket(packetSize, chunksPerPacket, - streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); + currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer() + .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.getSeqno() + ", src=" + src + ", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket + - ", bytesCurBlock=" + streamer.getBytesCurBlock()); + ", bytesCurBlock=" + getStreamer().getBytesCurBlock()); } } currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); currentPacket.incNumChunks(); - streamer.incBytesCurBlock(len); + getStreamer().incBytesCurBlock(len); // If packet is full, enqueue it for transmission // if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || - streamer.getBytesCurBlock() == blockSize) { + getStreamer().getBytesCurBlock() == blockSize) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + currentPacket.getSeqno() + ", src=" + src + - ", bytesCurBlock=" + streamer.getBytesCurBlock() + + ", bytesCurBlock=" + getStreamer().getBytesCurBlock() + ", blockSize=" + blockSize + - ", appendChunk=" + streamer.getAppendChunk()); + ", appendChunk=" + getStreamer().getAppendChunk()); } - streamer.waitAndQueuePacket(currentPacket); + getStreamer().waitAndQueuePacket(currentPacket); currentPacket = null; adjustChunkBoundary(); @@ -436,14 +436,14 @@ public class DFSOutputStream extends FSOutputSummer * crc chunks from now on. */ protected void adjustChunkBoundary() { - if (streamer.getAppendChunk() && - streamer.getBytesCurBlock() % bytesPerChecksum == 0) { - streamer.setAppendChunk(false); + if (getStreamer().getAppendChunk() && + getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) { + getStreamer().setAppendChunk(false); resetChecksumBufSize(); } - if (!streamer.getAppendChunk()) { - int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()), + if (!getStreamer().getAppendChunk()) { + int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()), dfsClient.getConf().getWritePacketSize()); computePacketChunkSize(psize, bytesPerChecksum); } @@ -456,13 +456,13 @@ public class DFSOutputStream extends FSOutputSummer * @throws IOException */ protected void endBlock() throws IOException { - if (streamer.getBytesCurBlock() == blockSize) { - currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), - streamer.getAndIncCurrentSeqno(), true); + if (getStreamer().getBytesCurBlock() == blockSize) { + currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), + getStreamer().getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock); - streamer.waitAndQueuePacket(currentPacket); + getStreamer().waitAndQueuePacket(currentPacket); currentPacket = null; - streamer.setBytesCurBlock(0); + getStreamer().setBytesCurBlock(0); lastFlushOffset = 0; } } @@ -555,31 +555,34 @@ public class DFSOutputStream extends FSOutputSummer // bytesCurBlock potentially incremented if there was buffered data if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DFSClient flush():" - + " bytesCurBlock=" + streamer.getBytesCurBlock() + DFSClient.LOG.debug("DFSClient flush(): " + + " bytesCurBlock=" + getStreamer().getBytesCurBlock() + " lastFlushOffset=" + lastFlushOffset + " createNewBlock=" + endBlock); } // Flush only if we haven't already flushed till this offset. - if (lastFlushOffset != streamer.getBytesCurBlock()) { - assert streamer.getBytesCurBlock() > lastFlushOffset; + if (lastFlushOffset != getStreamer().getBytesCurBlock()) { + assert getStreamer().getBytesCurBlock() > lastFlushOffset; // record the valid offset of this flush - lastFlushOffset = streamer.getBytesCurBlock(); + lastFlushOffset = getStreamer().getBytesCurBlock(); if (isSync && currentPacket == null && !endBlock) { // Nothing to send right now, // but sync was requested. // Send an empty packet if we do not end the block right now currentPacket = createPacket(packetSize, chunksPerPacket, - streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); + getStreamer().getBytesCurBlock(), getStreamer() + .getAndIncCurrentSeqno(), false); } } else { - if (isSync && streamer.getBytesCurBlock() > 0 && !endBlock) { + if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) { // Nothing to send right now, // and the block was partially written, // and sync was requested. - // So send an empty sync packet if we do not end the block right now + // So send an empty sync packet if we do not end the block right + // now currentPacket = createPacket(packetSize, chunksPerPacket, - streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false); + getStreamer().getBytesCurBlock(), getStreamer() + .getAndIncCurrentSeqno(), false); } else if (currentPacket != null) { // just discard the current packet since it is already been sent. currentPacket.releaseBuffer(byteArrayManager); @@ -588,42 +591,44 @@ public class DFSOutputStream extends FSOutputSummer } if (currentPacket != null) { currentPacket.setSyncBlock(isSync); - streamer.waitAndQueuePacket(currentPacket); + getStreamer().waitAndQueuePacket(currentPacket); currentPacket = null; } - if (endBlock && streamer.getBytesCurBlock() > 0) { + if (endBlock && getStreamer().getBytesCurBlock() > 0) { // Need to end the current block, thus send an empty packet to // indicate this is the end of the block and reset bytesCurBlock - currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), - streamer.getAndIncCurrentSeqno(), true); + currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), + getStreamer().getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock || isSync); - streamer.waitAndQueuePacket(currentPacket); + getStreamer().waitAndQueuePacket(currentPacket); currentPacket = null; - streamer.setBytesCurBlock(0); + getStreamer().setBytesCurBlock(0); lastFlushOffset = 0; } else { // Restore state of stream. Record the last flush offset // of the last full chunk that was flushed. - streamer.setBytesCurBlock(streamer.getBytesCurBlock() - numKept); + getStreamer().setBytesCurBlock( + getStreamer().getBytesCurBlock() - numKept); } - toWaitFor = streamer.getLastQueuedSeqno(); + toWaitFor = getStreamer().getLastQueuedSeqno(); } // end synchronized - streamer.waitForAckedSeqno(toWaitFor); + getStreamer().waitForAckedSeqno(toWaitFor); // update the block length first time irrespective of flag - if (updateLength || streamer.getPersistBlocks().get()) { + if (updateLength || getStreamer().getPersistBlocks().get()) { synchronized (this) { - if (!streamer.streamerClosed() && streamer.getBlock() != null) { - lastBlockLength = streamer.getBlock().getNumBytes(); + if (!getStreamer().streamerClosed() + && getStreamer().getBlock() != null) { + lastBlockLength = getStreamer().getBlock().getNumBytes(); } } } // If 1) any new blocks were allocated since the last flush, or 2) to // update length in NN is required, then persist block locations on // namenode. - if (streamer.getPersistBlocks().getAndSet(false) || updateLength) { + if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) { try { dfsClient.namenode.fsync(src, fileId, dfsClient.clientName, lastBlockLength); @@ -640,8 +645,8 @@ public class DFSOutputStream extends FSOutputSummer } synchronized(this) { - if (!streamer.streamerClosed()) { - streamer.setHflush(); + if (!getStreamer().streamerClosed()) { + getStreamer().setHflush(); } } } catch (InterruptedIOException interrupt) { @@ -653,7 +658,7 @@ public class DFSOutputStream extends FSOutputSummer DFSClient.LOG.warn("Error while syncing", e); synchronized (this) { if (!isClosed()) { - streamer.getLastException().set(e); + getStreamer().getLastException().set(e); closeThreads(true); } } @@ -678,10 +683,10 @@ public class DFSOutputStream extends FSOutputSummer public synchronized int getCurrentBlockReplication() throws IOException { dfsClient.checkOpen(); checkClosed(); - if (streamer.streamerClosed()) { + if (getStreamer().streamerClosed()) { return blockReplication; // no pipeline, return repl factor of file } - DatanodeInfo[] currentNodes = streamer.getNodes(); + DatanodeInfo[] currentNodes = getStreamer().getNodes(); if (currentNodes == null) { return blockReplication; // no pipeline, return repl factor of file } @@ -700,16 +705,16 @@ public class DFSOutputStream extends FSOutputSummer // // If there is data in the current buffer, send it across // - streamer.queuePacket(currentPacket); + getStreamer().queuePacket(currentPacket); currentPacket = null; - toWaitFor = streamer.getLastQueuedSeqno(); + toWaitFor = getStreamer().getLastQueuedSeqno(); } - streamer.waitForAckedSeqno(toWaitFor); + getStreamer().waitForAckedSeqno(toWaitFor); } protected synchronized void start() { - streamer.start(); + getStreamer().start(); } /** @@ -720,32 +725,32 @@ public class DFSOutputStream extends FSOutputSummer if (isClosed()) { return; } - streamer.getLastException().set(new IOException("Lease timeout of " + getStreamer().getLastException().set(new IOException("Lease timeout of " + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired.")); closeThreads(true); dfsClient.endFileLease(fileId); } boolean isClosed() { - return closed || streamer.streamerClosed(); + return closed || getStreamer().streamerClosed(); } void setClosed() { closed = true; - streamer.release(); + getStreamer().release(); } // shutdown datastreamer and responseprocessor threads. // interrupt datastreamer if force is true protected void closeThreads(boolean force) throws IOException { try { - streamer.close(force); - streamer.join(); - streamer.closeSocket(); + getStreamer().close(force); + getStreamer().join(); + getStreamer().closeSocket(); } catch (InterruptedException e) { throw new IOException("Failed to shutdown streamer"); } finally { - streamer.setSocketToNull(); + getStreamer().setSocketToNull(); setClosed(); } } @@ -767,7 +772,7 @@ public class DFSOutputStream extends FSOutputSummer protected synchronized void closeImpl() throws IOException { if (isClosed()) { - streamer.getLastException().check(true); + getStreamer().getLastException().check(true); return; } @@ -775,20 +780,20 @@ public class DFSOutputStream extends FSOutputSummer flushBuffer(); // flush from all upper layers if (currentPacket != null) { - streamer.waitAndQueuePacket(currentPacket); + getStreamer().waitAndQueuePacket(currentPacket); currentPacket = null; } - if (streamer.getBytesCurBlock() != 0) { + if (getStreamer().getBytesCurBlock() != 0) { // send an empty packet to mark the end of the block - currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(), - streamer.getAndIncCurrentSeqno(), true); + currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), + getStreamer().getAndIncCurrentSeqno(), true); currentPacket.setSyncBlock(shouldSyncBlock); } flushInternal(); // flush all data to Datanodes // get last block before destroying the streamer - ExtendedBlock lastBlock = streamer.getBlock(); + ExtendedBlock lastBlock = getStreamer().getBlock(); closeThreads(false); TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); try { @@ -846,7 +851,7 @@ public class DFSOutputStream extends FSOutputSummer @VisibleForTesting public void setArtificialSlowdown(long period) { - streamer.setArtificialSlowdown(period); + getStreamer().setArtificialSlowdown(period); } @VisibleForTesting @@ -873,7 +878,7 @@ public class DFSOutputStream extends FSOutputSummer * Returns the access token currently used by streamer, for testing only */ synchronized Token getBlockToken() { - return streamer.getBlockToken(); + return getStreamer().getBlockToken(); } @Override @@ -890,11 +895,25 @@ public class DFSOutputStream extends FSOutputSummer @VisibleForTesting ExtendedBlock getBlock() { - return streamer.getBlock(); + return getStreamer().getBlock(); } @VisibleForTesting public long getFileId() { return fileId; } + + /** + * Set the data streamer object. + */ + protected synchronized void setStreamer(DataStreamer streamer) { + this.streamer = streamer; + } + + /** + * Returns the data streamer object. + */ + protected synchronized DataStreamer getStreamer() { + return streamer; + } }