HDFS-7054. Make DFSOutputStream tracing more fine-grained (cmccabe)

(cherry picked from commit 8234fd0e10)
(cherry picked from commit 79c07bbaca)
This commit is contained in:
Colin Patrick Mccabe 2015-03-18 18:06:17 -07:00
parent 388696c089
commit 94976cb369
4 changed files with 300 additions and 116 deletions

View File

@ -437,6 +437,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7940. Add tracing to DFSClient#setQuotaByStorageType (Rakesh R via
Colin P. McCabe)
HDFS-7054. Make DFSOutputStream tracing more fine-grained (cmccabe)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -95,8 +95,11 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.apache.htrace.NullScope;
import org.apache.htrace.Sampler;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
@ -270,17 +273,11 @@ public class DFSOutputStream extends FSOutputSummer
/** Append on an existing block? */
private final boolean isAppend;
private final Span traceSpan;
/**
* construction with tracing info
*/
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, Span span) {
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
isAppend = false;
isLazyPersistFile = isLazyPersist(stat);
this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span;
}
/**
@ -291,10 +288,9 @@ public class DFSOutputStream extends FSOutputSummer
* @throws IOException if error occurs
*/
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
int bytesPerChecksum, Span span) throws IOException {
int bytesPerChecksum) throws IOException {
isAppend = true;
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
traceSpan = span;
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
@ -385,12 +381,8 @@ public class DFSOutputStream extends FSOutputSummer
@Override
public void run() {
long lastPacket = Time.now();
TraceScope traceScope = null;
if (traceSpan != null) {
traceScope = Trace.continueSpan(traceSpan);
}
TraceScope scope = NullScope.INSTANCE;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (hasError && response != null) {
try {
@ -436,11 +428,18 @@ public class DFSOutputStream extends FSOutputSummer
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
assert one != null;
} else {
one = dataQueue.getFirst(); // regular data packet
long parents[] = one.getTraceParents();
if (parents.length > 0) {
scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
// TODO: use setParents API once it's available from HTrace 3.2
// scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
// scope.getSpan().setParents(parents);
}
}
}
assert one != null;
// get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
@ -486,9 +485,12 @@ public class DFSOutputStream extends FSOutputSummer
}
// send the packet
Span span = null;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
span = scope.detach();
one.setTraceSpan(span);
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
@ -501,6 +503,7 @@ public class DFSOutputStream extends FSOutputSummer
}
// write out data to remote datanode
TraceScope writeScope = Trace.startSpan("writeTo", span);
try {
one.writeTo(blockStream);
blockStream.flush();
@ -513,6 +516,8 @@ public class DFSOutputStream extends FSOutputSummer
// will be taken out then.
tryMarkPrimaryDatanodeFailed();
throw e;
} finally {
writeScope.close();
}
lastPacket = Time.now();
@ -562,11 +567,10 @@ public class DFSOutputStream extends FSOutputSummer
// Not a datanode issue
streamerClosed = true;
}
} finally {
scope.close();
}
}
if (traceScope != null) {
traceScope.close();
}
closeInternal();
}
@ -721,6 +725,7 @@ public class DFSOutputStream extends FSOutputSummer
setName("ResponseProcessor for block " + block);
PipelineAck ack = new PipelineAck();
TraceScope scope = NullScope.INSTANCE;
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
// process responses from datanodes.
try {
@ -795,6 +800,8 @@ public class DFSOutputStream extends FSOutputSummer
block.setNumBytes(one.getLastByteOffsetBlock());
synchronized (dataQueue) {
scope = Trace.continueSpan(one.getTraceSpan());
one.setTraceSpan(null);
lastAckedSeqno = seqno;
ackQueue.removeFirst();
dataQueue.notifyAll();
@ -819,6 +826,8 @@ public class DFSOutputStream extends FSOutputSummer
}
responderClosed = true;
}
} finally {
scope.close();
}
}
}
@ -879,6 +888,12 @@ public class DFSOutputStream extends FSOutputSummer
// a client waiting on close() will be aware that the flush finished.
synchronized (dataQueue) {
DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
Span span = endOfBlockPacket.getTraceSpan();
if (span != null) {
// Close any trace span associated with this Packet
TraceScope scope = Trace.continueSpan(span);
scope.close();
}
assert endOfBlockPacket.isLastPacketInBlock();
assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
lastAckedSeqno = endOfBlockPacket.getSeqno();
@ -1586,11 +1601,7 @@ public class DFSOutputStream extends FSOutputSummer
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
Span traceSpan = null;
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
streamer = new DataStreamer(stat, null, traceSpan);
streamer = new DataStreamer(stat, null);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
@ -1600,50 +1611,56 @@ public class DFSOutputStream extends FSOutputSummer
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, int buffersize,
DataChecksum checksum, String[] favoredNodes) throws IOException {
HdfsFileStatus stat = null;
TraceScope scope =
dfsClient.getPathTraceScope("newStreamForCreate", src);
try {
HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS);
break;
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException(
AccessControlException.class,
DSQuotaExceededException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
ParentNotDirectoryException.class,
NSQuotaExceededException.class,
RetryStartFileException.class,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class,
UnknownCryptoProtocolVersionException.class);
if (e instanceof RetryStartFileException) {
if (retryCount > 0) {
shouldRetry = true;
retryCount--;
// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS);
break;
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException(
AccessControlException.class,
DSQuotaExceededException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
ParentNotDirectoryException.class,
NSQuotaExceededException.class,
RetryStartFileException.class,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class,
UnknownCryptoProtocolVersionException.class);
if (e instanceof RetryStartFileException) {
if (retryCount > 0) {
shouldRetry = true;
retryCount--;
} else {
throw new IOException("Too many retries because of encryption" +
" zone operations", e);
}
} else {
throw new IOException("Too many retries because of encryption" +
" zone operations", e);
throw e;
}
} else {
throw e;
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
out.start();
return out;
} finally {
scope.close();
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
out.start();
return out;
}
/** Construct a new output stream for append. */
@ -1653,21 +1670,16 @@ public class DFSOutputStream extends FSOutputSummer
this(dfsClient, src, progress, stat, checksum);
initialFileSize = stat.getLen(); // length of file when opened
Span traceSpan = null;
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
// The last partial block of the file has to be filled.
if (!toNewBlock && lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
} else {
computePacketChunkSize(dfsClient.getConf().writePacketSize,
bytesPerChecksum);
streamer = new DataStreamer(stat,
lastBlock != null ? lastBlock.getBlock() : null, traceSpan);
lastBlock != null ? lastBlock.getBlock() : null);
}
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
}
@ -1676,13 +1688,19 @@ public class DFSOutputStream extends FSOutputSummer
boolean toNewBlock, int bufferSize, Progressable progress,
LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
String[] favoredNodes) throws IOException {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
progress, lastBlock, stat, checksum);
if (favoredNodes != null && favoredNodes.length != 0) {
out.streamer.setFavoredNodes(favoredNodes);
TraceScope scope =
dfsClient.getPathTraceScope("newStreamForAppend", src);
try {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
progress, lastBlock, stat, checksum);
if (favoredNodes != null && favoredNodes.length != 0) {
out.streamer.setFavoredNodes(favoredNodes);
}
out.start();
return out;
} finally {
scope.close();
}
out.start();
return out;
}
private static boolean isLazyPersist(HdfsFileStatus stat) {
@ -1707,6 +1725,7 @@ public class DFSOutputStream extends FSOutputSummer
private void queueCurrentPacket() {
synchronized (dataQueue) {
if (currentPacket == null) return;
currentPacket.addTraceParent(Trace.currentSpan());
dataQueue.addLast(currentPacket);
lastQueuedSeqno = currentPacket.getSeqno();
if (DFSClient.LOG.isDebugEnabled()) {
@ -1721,23 +1740,39 @@ public class DFSOutputStream extends FSOutputSummer
synchronized (dataQueue) {
try {
// If queue is full, then wait till we have enough space
while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
boolean firstWait = true;
try {
dataQueue.wait();
} catch (InterruptedException e) {
// If we get interrupted while waiting to queue data, we still need to get rid
// of the current packet. This is because we have an invariant that if
// currentPacket gets full, it will get queued before the next writeChunk.
//
// Rather than wait around for space in the queue, we should instead try to
// return to the caller as soon as possible, even though we slightly overrun
// the MAX_PACKETS length.
Thread.currentThread().interrupt();
break;
while (!isClosed() && dataQueue.size() + ackQueue.size() >
dfsClient.getConf().writeMaxPackets) {
if (firstWait) {
Span span = Trace.currentSpan();
if (span != null) {
span.addTimelineAnnotation("dataQueue.wait");
}
firstWait = false;
}
try {
dataQueue.wait();
} catch (InterruptedException e) {
// If we get interrupted while waiting to queue data, we still need to get rid
// of the current packet. This is because we have an invariant that if
// currentPacket gets full, it will get queued before the next writeChunk.
//
// Rather than wait around for space in the queue, we should instead try to
// return to the caller as soon as possible, even though we slightly overrun
// the MAX_PACKETS length.
Thread.currentThread().interrupt();
break;
}
}
} finally {
Span span = Trace.currentSpan();
if ((span != null) && (!firstWait)) {
span.addTimelineAnnotation("end.wait");
}
}
}
checkClosed();
queueCurrentPacket();
checkClosed();
queueCurrentPacket();
} catch (ClosedChannelException e) {
}
}
@ -1747,6 +1782,17 @@ public class DFSOutputStream extends FSOutputSummer
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
try {
writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
} finally {
scope.close();
}
}
private synchronized void writeChunkImpl(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen();
checkClosed();
@ -1835,12 +1881,24 @@ public class DFSOutputStream extends FSOutputSummer
*/
@Override
public void hflush() throws IOException {
flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
TraceScope scope =
dfsClient.getPathTraceScope("hflush", src);
try {
flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
} finally {
scope.close();
}
}
@Override
public void hsync() throws IOException {
hsync(EnumSet.noneOf(SyncFlag.class));
TraceScope scope =
dfsClient.getPathTraceScope("hsync", src);
try {
flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
} finally {
scope.close();
}
}
/**
@ -1857,7 +1915,13 @@ public class DFSOutputStream extends FSOutputSummer
* whether or not to update the block length in NameNode.
*/
public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
flushOrSync(true, syncFlags);
TraceScope scope =
dfsClient.getPathTraceScope("hsync", src);
try {
flushOrSync(true, syncFlags);
} finally {
scope.close();
}
}
/**
@ -2038,33 +2102,38 @@ public class DFSOutputStream extends FSOutputSummer
}
private void waitForAckedSeqno(long seqno) throws IOException {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waiting for ack for: " + seqno);
}
long begin = Time.monotonicNow();
TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
try {
synchronized (dataQueue) {
while (!isClosed()) {
checkClosed();
if (lastAckedSeqno >= seqno) {
break;
}
try {
dataQueue.wait(1000); // when we receive an ack, we notify on
// dataQueue
} catch (InterruptedException ie) {
throw new InterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waiting for ack for: " + seqno);
}
long begin = Time.monotonicNow();
try {
synchronized (dataQueue) {
while (!isClosed()) {
checkClosed();
if (lastAckedSeqno >= seqno) {
break;
}
try {
dataQueue.wait(1000); // when we receive an ack, we notify on
// dataQueue
} catch (InterruptedException ie) {
throw new InterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
}
}
}
checkClosed();
} catch (ClosedChannelException e) {
}
checkClosed();
} catch (ClosedChannelException e) {
}
long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs) {
DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs) {
DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
}
} finally {
scope.close();
}
}
@ -2129,6 +2198,16 @@ public class DFSOutputStream extends FSOutputSummer
*/
@Override
public synchronized void close() throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("DFSOutputStream#close", src);
try {
closeImpl();
} finally {
scope.close();
}
}
private synchronized void closeImpl() throws IOException {
if (isClosed()) {
IOException e = lastException.getAndSet(null);
if (e == null)
@ -2154,7 +2233,12 @@ public class DFSOutputStream extends FSOutputSummer
// get last block before destroying the streamer
ExtendedBlock lastBlock = streamer.getBlock();
closeThreads(false);
completeFile(lastBlock);
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try {
completeFile(lastBlock);
} finally {
scope.close();
}
dfsClient.endFileLease(fileId);
} catch (ClosedChannelException e) {
} finally {

View File

@ -21,9 +21,12 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.htrace.Span;
/****************************************************************
* DFSPacket is used by DataStreamer and DFSOutputStream.
@ -33,6 +36,7 @@ import org.apache.hadoop.hdfs.util.ByteArrayManager;
class DFSPacket {
public static final long HEART_BEAT_SEQNO = -1L;
private static long[] EMPTY = new long[0];
private final long seqno; // sequence number of buffer in block
private final long offsetInBlock; // offset in block
private boolean syncBlock; // this packet forces the current block to disk
@ -59,6 +63,9 @@ class DFSPacket {
private int checksumPos;
private final int dataStart;
private int dataPos;
private long[] traceParents = EMPTY;
private int traceParentsUsed;
private Span span;
/**
* Create a new packet.
@ -267,4 +274,70 @@ class DFSPacket {
" lastPacketInBlock: " + this.lastPacketInBlock +
" lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
}
/**
* Add a trace parent span for this packet.<p/>
*
* Trace parent spans for a packet are the trace spans responsible for
* adding data to that packet. We store them as an array of longs for
* efficiency.<p/>
*
* Protected by the DFSOutputStream dataQueue lock.
*/
public void addTraceParent(Span span) {
if (span == null) {
return;
}
addTraceParent(span.getSpanId());
}
public void addTraceParent(long id) {
if (traceParentsUsed == traceParents.length) {
int newLength = (traceParents.length == 0) ? 8 :
traceParents.length * 2;
traceParents = Arrays.copyOf(traceParents, newLength);
}
traceParents[traceParentsUsed] = id;
traceParentsUsed++;
}
/**
* Get the trace parent spans for this packet.<p/>
*
* Will always be non-null.<p/>
*
* Protected by the DFSOutputStream dataQueue lock.
*/
public long[] getTraceParents() {
// Remove duplicates from the array.
int len = traceParentsUsed;
Arrays.sort(traceParents, 0, len);
int i = 0, j = 0;
long prevVal = 0; // 0 is not a valid span id
while (true) {
if (i == len) {
break;
}
long val = traceParents[i];
if (val != prevVal) {
traceParents[j] = val;
j++;
prevVal = val;
}
i++;
}
if (j < traceParents.length) {
traceParents = Arrays.copyOf(traceParents, j);
traceParentsUsed = traceParents.length;
}
return traceParents;
}
public void setTraceSpan(Span span) {
this.span = span;
}
public Span getTraceSpan() {
return span;
}
}

View File

@ -65,4 +65,29 @@ public class TestDFSPacket {
}
}
}
@Test
public void testAddParentsGetParents() throws Exception {
DFSPacket p = new DFSPacket(null, maxChunksPerPacket,
0, 0, checksumSize, false);
long parents[] = p.getTraceParents();
Assert.assertEquals(0, parents.length);
p.addTraceParent(123);
p.addTraceParent(123);
parents = p.getTraceParents();
Assert.assertEquals(1, parents.length);
Assert.assertEquals(123, parents[0]);
parents = p.getTraceParents(); // test calling 'get' again.
Assert.assertEquals(1, parents.length);
Assert.assertEquals(123, parents[0]);
p.addTraceParent(1);
p.addTraceParent(456);
p.addTraceParent(789);
parents = p.getTraceParents();
Assert.assertEquals(4, parents.length);
Assert.assertEquals(1, parents[0]);
Assert.assertEquals(123, parents[1]);
Assert.assertEquals(456, parents[2]);
Assert.assertEquals(789, parents[3]);
}
}