diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f01f047e052..6c3a3a7a8a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -437,6 +437,8 @@ Release 2.7.0 - UNRELEASED HDFS-7940. Add tracing to DFSClient#setQuotaByStorageType (Rakesh R via Colin P. McCabe) + HDFS-7054. Make DFSOutputStream tracing more fine-grained (cmccabe) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 0a8720a4bdf..a5983c73cdd 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -95,8 +95,11 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; +import org.apache.htrace.NullScope; +import org.apache.htrace.Sampler; import org.apache.htrace.Span; import org.apache.htrace.Trace; +import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; @@ -270,17 +273,11 @@ public class DFSOutputStream extends FSOutputSummer /** Append on an existing block? */ private final boolean isAppend; - private final Span traceSpan; - - /** - * construction with tracing info - */ - private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, Span span) { + private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) { isAppend = false; isLazyPersistFile = isLazyPersist(stat); this.block = block; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; - traceSpan = span; } /** @@ -291,10 +288,9 @@ public class DFSOutputStream extends FSOutputSummer * @throws IOException if error occurs */ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, - int bytesPerChecksum, Span span) throws IOException { + int bytesPerChecksum) throws IOException { isAppend = true; stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; - traceSpan = span; block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); @@ -385,12 +381,8 @@ public class DFSOutputStream extends FSOutputSummer @Override public void run() { long lastPacket = Time.now(); - TraceScope traceScope = null; - if (traceSpan != null) { - traceScope = Trace.continueSpan(traceSpan); - } + TraceScope scope = NullScope.INSTANCE; while (!streamerClosed && dfsClient.clientRunning) { - // if the Responder encountered an error, shutdown Responder if (hasError && response != null) { try { @@ -436,11 +428,18 @@ public class DFSOutputStream extends FSOutputSummer // get packet to be sent. if (dataQueue.isEmpty()) { one = createHeartbeatPacket(); + assert one != null; } else { one = dataQueue.getFirst(); // regular data packet + long parents[] = one.getTraceParents(); + if (parents.length > 0) { + scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); + // TODO: use setParents API once it's available from HTrace 3.2 +// scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); +// scope.getSpan().setParents(parents); + } } } - assert one != null; // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { @@ -486,9 +485,12 @@ public class DFSOutputStream extends FSOutputSummer } // send the packet + Span span = null; synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { + span = scope.detach(); + one.setTraceSpan(span); dataQueue.removeFirst(); ackQueue.addLast(one); dataQueue.notifyAll(); @@ -501,6 +503,7 @@ public class DFSOutputStream extends FSOutputSummer } // write out data to remote datanode + TraceScope writeScope = Trace.startSpan("writeTo", span); try { one.writeTo(blockStream); blockStream.flush(); @@ -513,6 +516,8 @@ public class DFSOutputStream extends FSOutputSummer // will be taken out then. tryMarkPrimaryDatanodeFailed(); throw e; + } finally { + writeScope.close(); } lastPacket = Time.now(); @@ -562,11 +567,10 @@ public class DFSOutputStream extends FSOutputSummer // Not a datanode issue streamerClosed = true; } + } finally { + scope.close(); } } - if (traceScope != null) { - traceScope.close(); - } closeInternal(); } @@ -721,6 +725,7 @@ public class DFSOutputStream extends FSOutputSummer setName("ResponseProcessor for block " + block); PipelineAck ack = new PipelineAck(); + TraceScope scope = NullScope.INSTANCE; while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { // process responses from datanodes. try { @@ -795,6 +800,8 @@ public class DFSOutputStream extends FSOutputSummer block.setNumBytes(one.getLastByteOffsetBlock()); synchronized (dataQueue) { + scope = Trace.continueSpan(one.getTraceSpan()); + one.setTraceSpan(null); lastAckedSeqno = seqno; ackQueue.removeFirst(); dataQueue.notifyAll(); @@ -819,6 +826,8 @@ public class DFSOutputStream extends FSOutputSummer } responderClosed = true; } + } finally { + scope.close(); } } } @@ -879,6 +888,12 @@ public class DFSOutputStream extends FSOutputSummer // a client waiting on close() will be aware that the flush finished. synchronized (dataQueue) { DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet + Span span = endOfBlockPacket.getTraceSpan(); + if (span != null) { + // Close any trace span associated with this Packet + TraceScope scope = Trace.continueSpan(span); + scope.close(); + } assert endOfBlockPacket.isLastPacketInBlock(); assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; lastAckedSeqno = endOfBlockPacket.getSeqno(); @@ -1586,11 +1601,7 @@ public class DFSOutputStream extends FSOutputSummer computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); - Span traceSpan = null; - if (Trace.isTracing()) { - traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach(); - } - streamer = new DataStreamer(stat, null, traceSpan); + streamer = new DataStreamer(stat, null); if (favoredNodes != null && favoredNodes.length != 0) { streamer.setFavoredNodes(favoredNodes); } @@ -1600,50 +1611,56 @@ public class DFSOutputStream extends FSOutputSummer FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException { - HdfsFileStatus stat = null; + TraceScope scope = + dfsClient.getPathTraceScope("newStreamForCreate", src); + try { + HdfsFileStatus stat = null; - // Retry the create if we get a RetryStartFileException up to a maximum - // number of times - boolean shouldRetry = true; - int retryCount = CREATE_RETRY_COUNT; - while (shouldRetry) { - shouldRetry = false; - try { - stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, - new EnumSetWritable(flag), createParent, replication, - blockSize, SUPPORTED_CRYPTO_VERSIONS); - break; - } catch (RemoteException re) { - IOException e = re.unwrapRemoteException( - AccessControlException.class, - DSQuotaExceededException.class, - FileAlreadyExistsException.class, - FileNotFoundException.class, - ParentNotDirectoryException.class, - NSQuotaExceededException.class, - RetryStartFileException.class, - SafeModeException.class, - UnresolvedPathException.class, - SnapshotAccessControlException.class, - UnknownCryptoProtocolVersionException.class); - if (e instanceof RetryStartFileException) { - if (retryCount > 0) { - shouldRetry = true; - retryCount--; + // Retry the create if we get a RetryStartFileException up to a maximum + // number of times + boolean shouldRetry = true; + int retryCount = CREATE_RETRY_COUNT; + while (shouldRetry) { + shouldRetry = false; + try { + stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, + new EnumSetWritable(flag), createParent, replication, + blockSize, SUPPORTED_CRYPTO_VERSIONS); + break; + } catch (RemoteException re) { + IOException e = re.unwrapRemoteException( + AccessControlException.class, + DSQuotaExceededException.class, + FileAlreadyExistsException.class, + FileNotFoundException.class, + ParentNotDirectoryException.class, + NSQuotaExceededException.class, + RetryStartFileException.class, + SafeModeException.class, + UnresolvedPathException.class, + SnapshotAccessControlException.class, + UnknownCryptoProtocolVersionException.class); + if (e instanceof RetryStartFileException) { + if (retryCount > 0) { + shouldRetry = true; + retryCount--; + } else { + throw new IOException("Too many retries because of encryption" + + " zone operations", e); + } } else { - throw new IOException("Too many retries because of encryption" + - " zone operations", e); + throw e; } - } else { - throw e; } } + Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); + final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, + flag, progress, checksum, favoredNodes); + out.start(); + return out; + } finally { + scope.close(); } - Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); - final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, - flag, progress, checksum, favoredNodes); - out.start(); - return out; } /** Construct a new output stream for append. */ @@ -1653,21 +1670,16 @@ public class DFSOutputStream extends FSOutputSummer this(dfsClient, src, progress, stat, checksum); initialFileSize = stat.getLen(); // length of file when opened - Span traceSpan = null; - if (Trace.isTracing()) { - traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach(); - } - // The last partial block of the file has to be filled. if (!toNewBlock && lastBlock != null) { // indicate that we are appending to an existing block bytesCurBlock = lastBlock.getBlockSize(); - streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan); + streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum); } else { computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); streamer = new DataStreamer(stat, - lastBlock != null ? lastBlock.getBlock() : null, traceSpan); + lastBlock != null ? lastBlock.getBlock() : null); } this.fileEncryptionInfo = stat.getFileEncryptionInfo(); } @@ -1676,13 +1688,19 @@ public class DFSOutputStream extends FSOutputSummer boolean toNewBlock, int bufferSize, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) throws IOException { - final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock, - progress, lastBlock, stat, checksum); - if (favoredNodes != null && favoredNodes.length != 0) { - out.streamer.setFavoredNodes(favoredNodes); + TraceScope scope = + dfsClient.getPathTraceScope("newStreamForAppend", src); + try { + final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock, + progress, lastBlock, stat, checksum); + if (favoredNodes != null && favoredNodes.length != 0) { + out.streamer.setFavoredNodes(favoredNodes); + } + out.start(); + return out; + } finally { + scope.close(); } - out.start(); - return out; } private static boolean isLazyPersist(HdfsFileStatus stat) { @@ -1707,6 +1725,7 @@ public class DFSOutputStream extends FSOutputSummer private void queueCurrentPacket() { synchronized (dataQueue) { if (currentPacket == null) return; + currentPacket.addTraceParent(Trace.currentSpan()); dataQueue.addLast(currentPacket); lastQueuedSeqno = currentPacket.getSeqno(); if (DFSClient.LOG.isDebugEnabled()) { @@ -1721,23 +1740,39 @@ public class DFSOutputStream extends FSOutputSummer synchronized (dataQueue) { try { // If queue is full, then wait till we have enough space - while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) { + boolean firstWait = true; try { - dataQueue.wait(); - } catch (InterruptedException e) { - // If we get interrupted while waiting to queue data, we still need to get rid - // of the current packet. This is because we have an invariant that if - // currentPacket gets full, it will get queued before the next writeChunk. - // - // Rather than wait around for space in the queue, we should instead try to - // return to the caller as soon as possible, even though we slightly overrun - // the MAX_PACKETS length. - Thread.currentThread().interrupt(); - break; + while (!isClosed() && dataQueue.size() + ackQueue.size() > + dfsClient.getConf().writeMaxPackets) { + if (firstWait) { + Span span = Trace.currentSpan(); + if (span != null) { + span.addTimelineAnnotation("dataQueue.wait"); + } + firstWait = false; + } + try { + dataQueue.wait(); + } catch (InterruptedException e) { + // If we get interrupted while waiting to queue data, we still need to get rid + // of the current packet. This is because we have an invariant that if + // currentPacket gets full, it will get queued before the next writeChunk. + // + // Rather than wait around for space in the queue, we should instead try to + // return to the caller as soon as possible, even though we slightly overrun + // the MAX_PACKETS length. + Thread.currentThread().interrupt(); + break; + } + } + } finally { + Span span = Trace.currentSpan(); + if ((span != null) && (!firstWait)) { + span.addTimelineAnnotation("end.wait"); + } } - } - checkClosed(); - queueCurrentPacket(); + checkClosed(); + queueCurrentPacket(); } catch (ClosedChannelException e) { } } @@ -1747,6 +1782,17 @@ public class DFSOutputStream extends FSOutputSummer @Override protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { + TraceScope scope = + dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src); + try { + writeChunkImpl(b, offset, len, checksum, ckoff, cklen); + } finally { + scope.close(); + } + } + + private synchronized void writeChunkImpl(byte[] b, int offset, int len, + byte[] checksum, int ckoff, int cklen) throws IOException { dfsClient.checkOpen(); checkClosed(); @@ -1835,12 +1881,24 @@ public class DFSOutputStream extends FSOutputSummer */ @Override public void hflush() throws IOException { - flushOrSync(false, EnumSet.noneOf(SyncFlag.class)); + TraceScope scope = + dfsClient.getPathTraceScope("hflush", src); + try { + flushOrSync(false, EnumSet.noneOf(SyncFlag.class)); + } finally { + scope.close(); + } } @Override public void hsync() throws IOException { - hsync(EnumSet.noneOf(SyncFlag.class)); + TraceScope scope = + dfsClient.getPathTraceScope("hsync", src); + try { + flushOrSync(true, EnumSet.noneOf(SyncFlag.class)); + } finally { + scope.close(); + } } /** @@ -1857,7 +1915,13 @@ public class DFSOutputStream extends FSOutputSummer * whether or not to update the block length in NameNode. */ public void hsync(EnumSet syncFlags) throws IOException { - flushOrSync(true, syncFlags); + TraceScope scope = + dfsClient.getPathTraceScope("hsync", src); + try { + flushOrSync(true, syncFlags); + } finally { + scope.close(); + } } /** @@ -2038,33 +2102,38 @@ public class DFSOutputStream extends FSOutputSummer } private void waitForAckedSeqno(long seqno) throws IOException { - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("Waiting for ack for: " + seqno); - } - long begin = Time.monotonicNow(); + TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER); try { - synchronized (dataQueue) { - while (!isClosed()) { - checkClosed(); - if (lastAckedSeqno >= seqno) { - break; - } - try { - dataQueue.wait(1000); // when we receive an ack, we notify on - // dataQueue - } catch (InterruptedException ie) { - throw new InterruptedIOException( - "Interrupted while waiting for data to be acknowledged by pipeline"); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("Waiting for ack for: " + seqno); + } + long begin = Time.monotonicNow(); + try { + synchronized (dataQueue) { + while (!isClosed()) { + checkClosed(); + if (lastAckedSeqno >= seqno) { + break; + } + try { + dataQueue.wait(1000); // when we receive an ack, we notify on + // dataQueue + } catch (InterruptedException ie) { + throw new InterruptedIOException( + "Interrupted while waiting for data to be acknowledged by pipeline"); + } } } + checkClosed(); + } catch (ClosedChannelException e) { } - checkClosed(); - } catch (ClosedChannelException e) { - } - long duration = Time.monotonicNow() - begin; - if (duration > dfsclientSlowLogThresholdMs) { - DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration - + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); + long duration = Time.monotonicNow() - begin; + if (duration > dfsclientSlowLogThresholdMs) { + DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration + + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); + } + } finally { + scope.close(); } } @@ -2129,6 +2198,16 @@ public class DFSOutputStream extends FSOutputSummer */ @Override public synchronized void close() throws IOException { + TraceScope scope = + dfsClient.getPathTraceScope("DFSOutputStream#close", src); + try { + closeImpl(); + } finally { + scope.close(); + } + } + + private synchronized void closeImpl() throws IOException { if (isClosed()) { IOException e = lastException.getAndSet(null); if (e == null) @@ -2154,7 +2233,12 @@ public class DFSOutputStream extends FSOutputSummer // get last block before destroying the streamer ExtendedBlock lastBlock = streamer.getBlock(); closeThreads(false); - completeFile(lastBlock); + TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); + try { + completeFile(lastBlock); + } finally { + scope.close(); + } dfsClient.endFileLease(fileId); } catch (ClosedChannelException e) { } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java index 9b3ea515b4f..7e7f7801b70 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@ -21,9 +21,12 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.channels.ClosedChannelException; +import java.util.Arrays; + import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.util.ByteArrayManager; +import org.apache.htrace.Span; /**************************************************************** * DFSPacket is used by DataStreamer and DFSOutputStream. @@ -33,6 +36,7 @@ import org.apache.hadoop.hdfs.util.ByteArrayManager; class DFSPacket { public static final long HEART_BEAT_SEQNO = -1L; + private static long[] EMPTY = new long[0]; private final long seqno; // sequence number of buffer in block private final long offsetInBlock; // offset in block private boolean syncBlock; // this packet forces the current block to disk @@ -59,6 +63,9 @@ class DFSPacket { private int checksumPos; private final int dataStart; private int dataPos; + private long[] traceParents = EMPTY; + private int traceParentsUsed; + private Span span; /** * Create a new packet. @@ -267,4 +274,70 @@ class DFSPacket { " lastPacketInBlock: " + this.lastPacketInBlock + " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock(); } + + /** + * Add a trace parent span for this packet.

+ * + * Trace parent spans for a packet are the trace spans responsible for + * adding data to that packet. We store them as an array of longs for + * efficiency.

+ * + * Protected by the DFSOutputStream dataQueue lock. + */ + public void addTraceParent(Span span) { + if (span == null) { + return; + } + addTraceParent(span.getSpanId()); + } + + public void addTraceParent(long id) { + if (traceParentsUsed == traceParents.length) { + int newLength = (traceParents.length == 0) ? 8 : + traceParents.length * 2; + traceParents = Arrays.copyOf(traceParents, newLength); + } + traceParents[traceParentsUsed] = id; + traceParentsUsed++; + } + + /** + * Get the trace parent spans for this packet.

+ * + * Will always be non-null.

+ * + * Protected by the DFSOutputStream dataQueue lock. + */ + public long[] getTraceParents() { + // Remove duplicates from the array. + int len = traceParentsUsed; + Arrays.sort(traceParents, 0, len); + int i = 0, j = 0; + long prevVal = 0; // 0 is not a valid span id + while (true) { + if (i == len) { + break; + } + long val = traceParents[i]; + if (val != prevVal) { + traceParents[j] = val; + j++; + prevVal = val; + } + i++; + } + if (j < traceParents.length) { + traceParents = Arrays.copyOf(traceParents, j); + traceParentsUsed = traceParents.length; + } + return traceParents; + } + + public void setTraceSpan(Span span) { + this.span = span; + } + + public Span getTraceSpan() { + return span; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java index 8bf60971b3d..daee6083ebb 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java @@ -65,4 +65,29 @@ public class TestDFSPacket { } } } + + @Test + public void testAddParentsGetParents() throws Exception { + DFSPacket p = new DFSPacket(null, maxChunksPerPacket, + 0, 0, checksumSize, false); + long parents[] = p.getTraceParents(); + Assert.assertEquals(0, parents.length); + p.addTraceParent(123); + p.addTraceParent(123); + parents = p.getTraceParents(); + Assert.assertEquals(1, parents.length); + Assert.assertEquals(123, parents[0]); + parents = p.getTraceParents(); // test calling 'get' again. + Assert.assertEquals(1, parents.length); + Assert.assertEquals(123, parents[0]); + p.addTraceParent(1); + p.addTraceParent(456); + p.addTraceParent(789); + parents = p.getTraceParents(); + Assert.assertEquals(4, parents.length); + Assert.assertEquals(1, parents[0]); + Assert.assertEquals(123, parents[1]); + Assert.assertEquals(456, parents[2]); + Assert.assertEquals(789, parents[3]); + } }