HDFS-8386. Improve synchronization of 'streamer' reference in DFSOutputStream. Contributed by Rakesh R.

This commit is contained in:
Andrew Wang 2015-06-02 15:39:24 -07:00
parent 03fb5c6425
commit efc510a570
2 changed files with 92 additions and 70 deletions

View File

@ -591,6 +591,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8489. Subclass BlockInfo to represent contiguous blocks.
(Zhe Zhang via jing9)
HDFS-8386. Improve synchronization of 'streamer' reference in
DFSOutputStream. (Rakesh R via wang)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -139,7 +139,7 @@ public class DFSOutputStream extends FSOutputSummer
@Override
protected void checkClosed() throws IOException {
if (isClosed()) {
streamer.getLastException().throwException4Close();
getStreamer().getLastException().throwException4Close();
}
}
@ -148,10 +148,10 @@ public class DFSOutputStream extends FSOutputSummer
//
@VisibleForTesting
public synchronized DatanodeInfo[] getPipeline() {
if (streamer.streamerClosed()) {
if (getStreamer().streamerClosed()) {
return null;
}
DatanodeInfo[] currentNodes = streamer.getNodes();
DatanodeInfo[] currentNodes = getStreamer().getNodes();
if (currentNodes == null) {
return null;
}
@ -293,9 +293,9 @@ public class DFSOutputStream extends FSOutputSummer
// indicate that we are appending to an existing block
streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
cachingStrategy, byteArrayManager);
streamer.setBytesCurBlock(lastBlock.getBlockSize());
getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
adjustPacketChunkSize(stat);
streamer.setPipelineInConstruction(lastBlock);
getStreamer().setPipelineInConstruction(lastBlock);
} else {
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum);
@ -329,7 +329,7 @@ public class DFSOutputStream extends FSOutputSummer
//
computePacketChunkSize(0, freeInCksum);
setChecksumBufSize(freeInCksum);
streamer.setAppendChunk(true);
getStreamer().setAppendChunk(true);
} else {
// if the remaining space in the block is smaller than
// that expected size of of a packet, then create
@ -392,36 +392,36 @@ public class DFSOutputStream extends FSOutputSummer
}
if (currentPacket == null) {
currentPacket = createPacket(packetSize, chunksPerPacket,
streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
.getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + streamer.getBytesCurBlock());
", bytesCurBlock=" + getStreamer().getBytesCurBlock());
}
}
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks();
streamer.incBytesCurBlock(len);
getStreamer().incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
streamer.getBytesCurBlock() == blockSize) {
getStreamer().getBytesCurBlock() == blockSize) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.getSeqno() +
", src=" + src +
", bytesCurBlock=" + streamer.getBytesCurBlock() +
", bytesCurBlock=" + getStreamer().getBytesCurBlock() +
", blockSize=" + blockSize +
", appendChunk=" + streamer.getAppendChunk());
", appendChunk=" + getStreamer().getAppendChunk());
}
streamer.waitAndQueuePacket(currentPacket);
getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
adjustChunkBoundary();
@ -436,14 +436,14 @@ public class DFSOutputStream extends FSOutputSummer
* crc chunks from now on.
*/
protected void adjustChunkBoundary() {
if (streamer.getAppendChunk() &&
streamer.getBytesCurBlock() % bytesPerChecksum == 0) {
streamer.setAppendChunk(false);
if (getStreamer().getAppendChunk() &&
getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) {
getStreamer().setAppendChunk(false);
resetChecksumBufSize();
}
if (!streamer.getAppendChunk()) {
int psize = Math.min((int)(blockSize- streamer.getBytesCurBlock()),
if (!getStreamer().getAppendChunk()) {
int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
dfsClient.getConf().getWritePacketSize());
computePacketChunkSize(psize, bytesPerChecksum);
}
@ -456,13 +456,13 @@ public class DFSOutputStream extends FSOutputSummer
* @throws IOException
*/
protected void endBlock() throws IOException {
if (streamer.getBytesCurBlock() == blockSize) {
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
if (getStreamer().getBytesCurBlock() == blockSize) {
currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
streamer.waitAndQueuePacket(currentPacket);
getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
streamer.setBytesCurBlock(0);
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
}
}
@ -551,30 +551,33 @@ public class DFSOutputStream extends FSOutputSummer
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient flush(): "
+ " bytesCurBlock=" + streamer.getBytesCurBlock()
+ " bytesCurBlock=" + getStreamer().getBytesCurBlock()
+ " lastFlushOffset=" + lastFlushOffset
+ " createNewBlock=" + endBlock);
}
// Flush only if we haven't already flushed till this offset.
if (lastFlushOffset != streamer.getBytesCurBlock()) {
assert streamer.getBytesCurBlock() > lastFlushOffset;
if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
assert getStreamer().getBytesCurBlock() > lastFlushOffset;
// record the valid offset of this flush
lastFlushOffset = streamer.getBytesCurBlock();
lastFlushOffset = getStreamer().getBytesCurBlock();
if (isSync && currentPacket == null && !endBlock) {
// Nothing to send right now,
// but sync was requested.
// Send an empty packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket,
streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
getStreamer().getBytesCurBlock(), getStreamer()
.getAndIncCurrentSeqno(), false);
}
} else {
if (isSync && streamer.getBytesCurBlock() > 0 && !endBlock) {
if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) {
// Nothing to send right now,
// and the block was partially written,
// and sync was requested.
// So send an empty sync packet if we do not end the block right now
// So send an empty sync packet if we do not end the block right
// now
currentPacket = createPacket(packetSize, chunksPerPacket,
streamer.getBytesCurBlock(), streamer.getAndIncCurrentSeqno(), false);
getStreamer().getBytesCurBlock(), getStreamer()
.getAndIncCurrentSeqno(), false);
} else if (currentPacket != null) {
// just discard the current packet since it is already been sent.
currentPacket.releaseBuffer(byteArrayManager);
@ -583,42 +586,44 @@ public class DFSOutputStream extends FSOutputSummer
}
if (currentPacket != null) {
currentPacket.setSyncBlock(isSync);
streamer.waitAndQueuePacket(currentPacket);
getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
}
if (endBlock && streamer.getBytesCurBlock() > 0) {
if (endBlock && getStreamer().getBytesCurBlock() > 0) {
// Need to end the current block, thus send an empty packet to
// indicate this is the end of the block and reset bytesCurBlock
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock || isSync);
streamer.waitAndQueuePacket(currentPacket);
getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
streamer.setBytesCurBlock(0);
getStreamer().setBytesCurBlock(0);
lastFlushOffset = 0;
} else {
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
streamer.setBytesCurBlock(streamer.getBytesCurBlock() - numKept);
getStreamer().setBytesCurBlock(
getStreamer().getBytesCurBlock() - numKept);
}
toWaitFor = streamer.getLastQueuedSeqno();
toWaitFor = getStreamer().getLastQueuedSeqno();
} // end synchronized
streamer.waitForAckedSeqno(toWaitFor);
getStreamer().waitForAckedSeqno(toWaitFor);
// update the block length first time irrespective of flag
if (updateLength || streamer.getPersistBlocks().get()) {
if (updateLength || getStreamer().getPersistBlocks().get()) {
synchronized (this) {
if (!streamer.streamerClosed() && streamer.getBlock() != null) {
lastBlockLength = streamer.getBlock().getNumBytes();
if (!getStreamer().streamerClosed()
&& getStreamer().getBlock() != null) {
lastBlockLength = getStreamer().getBlock().getNumBytes();
}
}
}
// If 1) any new blocks were allocated since the last flush, or 2) to
// update length in NN is required, then persist block locations on
// namenode.
if (streamer.getPersistBlocks().getAndSet(false) || updateLength) {
if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {
try {
dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
lastBlockLength);
@ -635,8 +640,8 @@ public class DFSOutputStream extends FSOutputSummer
}
synchronized(this) {
if (!streamer.streamerClosed()) {
streamer.setHflush();
if (!getStreamer().streamerClosed()) {
getStreamer().setHflush();
}
}
} catch (InterruptedIOException interrupt) {
@ -648,7 +653,7 @@ public class DFSOutputStream extends FSOutputSummer
DFSClient.LOG.warn("Error while syncing", e);
synchronized (this) {
if (!isClosed()) {
streamer.getLastException().set(e);
getStreamer().getLastException().set(e);
closeThreads(true);
}
}
@ -673,10 +678,10 @@ public class DFSOutputStream extends FSOutputSummer
public synchronized int getCurrentBlockReplication() throws IOException {
dfsClient.checkOpen();
checkClosed();
if (streamer.streamerClosed()) {
if (getStreamer().streamerClosed()) {
return blockReplication; // no pipeline, return repl factor of file
}
DatanodeInfo[] currentNodes = streamer.getNodes();
DatanodeInfo[] currentNodes = getStreamer().getNodes();
if (currentNodes == null) {
return blockReplication; // no pipeline, return repl factor of file
}
@ -695,16 +700,16 @@ public class DFSOutputStream extends FSOutputSummer
//
// If there is data in the current buffer, send it across
//
streamer.queuePacket(currentPacket);
getStreamer().queuePacket(currentPacket);
currentPacket = null;
toWaitFor = streamer.getLastQueuedSeqno();
toWaitFor = getStreamer().getLastQueuedSeqno();
}
streamer.waitForAckedSeqno(toWaitFor);
getStreamer().waitForAckedSeqno(toWaitFor);
}
protected synchronized void start() {
streamer.start();
getStreamer().start();
}
/**
@ -715,32 +720,32 @@ public class DFSOutputStream extends FSOutputSummer
if (isClosed()) {
return;
}
streamer.getLastException().set(new IOException("Lease timeout of "
getStreamer().getLastException().set(new IOException("Lease timeout of "
+ (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
closeThreads(true);
dfsClient.endFileLease(fileId);
}
boolean isClosed() {
return closed || streamer.streamerClosed();
return closed || getStreamer().streamerClosed();
}
void setClosed() {
closed = true;
streamer.release();
getStreamer().release();
}
// shutdown datastreamer and responseprocessor threads.
// interrupt datastreamer if force is true
protected void closeThreads(boolean force) throws IOException {
try {
streamer.close(force);
streamer.join();
streamer.closeSocket();
getStreamer().close(force);
getStreamer().join();
getStreamer().closeSocket();
} catch (InterruptedException e) {
throw new IOException("Failed to shutdown streamer");
} finally {
streamer.setSocketToNull();
getStreamer().setSocketToNull();
setClosed();
}
}
@ -762,7 +767,7 @@ public class DFSOutputStream extends FSOutputSummer
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
streamer.getLastException().check(true);
getStreamer().getLastException().check(true);
return;
}
@ -770,20 +775,20 @@ public class DFSOutputStream extends FSOutputSummer
flushBuffer(); // flush from all upper layers
if (currentPacket != null) {
streamer.waitAndQueuePacket(currentPacket);
getStreamer().waitAndQueuePacket(currentPacket);
currentPacket = null;
}
if (streamer.getBytesCurBlock() != 0) {
if (getStreamer().getBytesCurBlock() != 0) {
// send an empty packet to mark the end of the block
currentPacket = createPacket(0, 0, streamer.getBytesCurBlock(),
streamer.getAndIncCurrentSeqno(), true);
currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
getStreamer().getAndIncCurrentSeqno(), true);
currentPacket.setSyncBlock(shouldSyncBlock);
}
flushInternal(); // flush all data to Datanodes
// get last block before destroying the streamer
ExtendedBlock lastBlock = streamer.getBlock();
ExtendedBlock lastBlock = getStreamer().getBlock();
closeThreads(false);
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try {
@ -841,7 +846,7 @@ public class DFSOutputStream extends FSOutputSummer
@VisibleForTesting
public void setArtificialSlowdown(long period) {
streamer.setArtificialSlowdown(period);
getStreamer().setArtificialSlowdown(period);
}
@VisibleForTesting
@ -868,7 +873,7 @@ public class DFSOutputStream extends FSOutputSummer
* Returns the access token currently used by streamer, for testing only
*/
synchronized Token<BlockTokenIdentifier> getBlockToken() {
return streamer.getBlockToken();
return getStreamer().getBlockToken();
}
@Override
@ -885,11 +890,25 @@ public class DFSOutputStream extends FSOutputSummer
@VisibleForTesting
ExtendedBlock getBlock() {
return streamer.getBlock();
return getStreamer().getBlock();
}
@VisibleForTesting
public long getFileId() {
return fileId;
}
/**
* Set the data streamer object.
*/
protected synchronized void setStreamer(DataStreamer streamer) {
this.streamer = streamer;
}
/**
* Returns the data streamer object.
*/
protected synchronized DataStreamer getStreamer() {
return streamer;
}
}