HDFS-7054. Make DFSOutputStream tracing more fine-grained (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2015-03-18 18:06:17 -07:00
parent 20b49224eb
commit 8234fd0e10
4 changed files with 300 additions and 116 deletions

View File

@ -759,6 +759,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7940. Add tracing to DFSClient#setQuotaByStorageType (Rakesh R via HDFS-7940. Add tracing to DFSClient#setQuotaByStorageType (Rakesh R via
Colin P. McCabe) Colin P. McCabe)
HDFS-7054. Make DFSOutputStream tracing more fine-grained (cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode. HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -96,8 +96,11 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.htrace.NullScope;
import org.apache.htrace.Sampler;
import org.apache.htrace.Span; import org.apache.htrace.Span;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -272,17 +275,11 @@ public class DFSOutputStream extends FSOutputSummer
/** Append on an existing block? */ /** Append on an existing block? */
private final boolean isAppend; private final boolean isAppend;
private final Span traceSpan; private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
/**
* construction with tracing info
*/
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, Span span) {
isAppend = false; isAppend = false;
isLazyPersistFile = isLazyPersist(stat); isLazyPersistFile = isLazyPersist(stat);
this.block = block; this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span;
} }
/** /**
@ -293,10 +290,9 @@ public class DFSOutputStream extends FSOutputSummer
* @throws IOException if error occurs * @throws IOException if error occurs
*/ */
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
int bytesPerChecksum, Span span) throws IOException { int bytesPerChecksum) throws IOException {
isAppend = true; isAppend = true;
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
traceSpan = span;
block = lastBlock.getBlock(); block = lastBlock.getBlock();
bytesSent = block.getNumBytes(); bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken(); accessToken = lastBlock.getBlockToken();
@ -387,12 +383,8 @@ public class DFSOutputStream extends FSOutputSummer
@Override @Override
public void run() { public void run() {
long lastPacket = Time.now(); long lastPacket = Time.now();
TraceScope traceScope = null; TraceScope scope = NullScope.INSTANCE;
if (traceSpan != null) {
traceScope = Trace.continueSpan(traceSpan);
}
while (!streamerClosed && dfsClient.clientRunning) { while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder // if the Responder encountered an error, shutdown Responder
if (hasError && response != null) { if (hasError && response != null) {
try { try {
@ -438,11 +430,18 @@ public class DFSOutputStream extends FSOutputSummer
// get packet to be sent. // get packet to be sent.
if (dataQueue.isEmpty()) { if (dataQueue.isEmpty()) {
one = createHeartbeatPacket(); one = createHeartbeatPacket();
assert one != null;
} else { } else {
one = dataQueue.getFirst(); // regular data packet one = dataQueue.getFirst(); // regular data packet
long parents[] = one.getTraceParents();
if (parents.length > 0) {
scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
// TODO: use setParents API once it's available from HTrace 3.2
// scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
// scope.getSpan().setParents(parents);
}
} }
} }
assert one != null;
// get new block from namenode. // get new block from namenode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
@ -488,9 +487,12 @@ public class DFSOutputStream extends FSOutputSummer
} }
// send the packet // send the packet
Span span = null;
synchronized (dataQueue) { synchronized (dataQueue) {
// move packet from dataQueue to ackQueue // move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) { if (!one.isHeartbeatPacket()) {
span = scope.detach();
one.setTraceSpan(span);
dataQueue.removeFirst(); dataQueue.removeFirst();
ackQueue.addLast(one); ackQueue.addLast(one);
dataQueue.notifyAll(); dataQueue.notifyAll();
@ -503,6 +505,7 @@ public class DFSOutputStream extends FSOutputSummer
} }
// write out data to remote datanode // write out data to remote datanode
TraceScope writeScope = Trace.startSpan("writeTo", span);
try { try {
one.writeTo(blockStream); one.writeTo(blockStream);
blockStream.flush(); blockStream.flush();
@ -515,6 +518,8 @@ public class DFSOutputStream extends FSOutputSummer
// will be taken out then. // will be taken out then.
tryMarkPrimaryDatanodeFailed(); tryMarkPrimaryDatanodeFailed();
throw e; throw e;
} finally {
writeScope.close();
} }
lastPacket = Time.now(); lastPacket = Time.now();
@ -570,11 +575,10 @@ public class DFSOutputStream extends FSOutputSummer
// Not a datanode issue // Not a datanode issue
streamerClosed = true; streamerClosed = true;
} }
} finally {
scope.close();
} }
} }
if (traceScope != null) {
traceScope.close();
}
closeInternal(); closeInternal();
} }
@ -729,6 +733,7 @@ public class DFSOutputStream extends FSOutputSummer
setName("ResponseProcessor for block " + block); setName("ResponseProcessor for block " + block);
PipelineAck ack = new PipelineAck(); PipelineAck ack = new PipelineAck();
TraceScope scope = NullScope.INSTANCE;
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
// process responses from datanodes. // process responses from datanodes.
try { try {
@ -803,6 +808,8 @@ public class DFSOutputStream extends FSOutputSummer
block.setNumBytes(one.getLastByteOffsetBlock()); block.setNumBytes(one.getLastByteOffsetBlock());
synchronized (dataQueue) { synchronized (dataQueue) {
scope = Trace.continueSpan(one.getTraceSpan());
one.setTraceSpan(null);
lastAckedSeqno = seqno; lastAckedSeqno = seqno;
ackQueue.removeFirst(); ackQueue.removeFirst();
dataQueue.notifyAll(); dataQueue.notifyAll();
@ -827,6 +834,8 @@ public class DFSOutputStream extends FSOutputSummer
} }
responderClosed = true; responderClosed = true;
} }
} finally {
scope.close();
} }
} }
} }
@ -887,6 +896,12 @@ public class DFSOutputStream extends FSOutputSummer
// a client waiting on close() will be aware that the flush finished. // a client waiting on close() will be aware that the flush finished.
synchronized (dataQueue) { synchronized (dataQueue) {
DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
Span span = endOfBlockPacket.getTraceSpan();
if (span != null) {
// Close any trace span associated with this Packet
TraceScope scope = Trace.continueSpan(span);
scope.close();
}
assert endOfBlockPacket.isLastPacketInBlock(); assert endOfBlockPacket.isLastPacketInBlock();
assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
lastAckedSeqno = endOfBlockPacket.getSeqno(); lastAckedSeqno = endOfBlockPacket.getSeqno();
@ -1594,11 +1609,7 @@ public class DFSOutputStream extends FSOutputSummer
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
Span traceSpan = null; streamer = new DataStreamer(stat, null);
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
streamer = new DataStreamer(stat, null, traceSpan);
if (favoredNodes != null && favoredNodes.length != 0) { if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes); streamer.setFavoredNodes(favoredNodes);
} }
@ -1608,50 +1619,56 @@ public class DFSOutputStream extends FSOutputSummer
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, int buffersize, short replication, long blockSize, Progressable progress, int buffersize,
DataChecksum checksum, String[] favoredNodes) throws IOException { DataChecksum checksum, String[] favoredNodes) throws IOException {
HdfsFileStatus stat = null; TraceScope scope =
dfsClient.getPathTraceScope("newStreamForCreate", src);
try {
HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum // Retry the create if we get a RetryStartFileException up to a maximum
// number of times // number of times
boolean shouldRetry = true; boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT; int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) { while (shouldRetry) {
shouldRetry = false; shouldRetry = false;
try { try {
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<CreateFlag>(flag), createParent, replication, new EnumSetWritable<CreateFlag>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS); blockSize, SUPPORTED_CRYPTO_VERSIONS);
break; break;
} catch (RemoteException re) { } catch (RemoteException re) {
IOException e = re.unwrapRemoteException( IOException e = re.unwrapRemoteException(
AccessControlException.class, AccessControlException.class,
DSQuotaExceededException.class, DSQuotaExceededException.class,
FileAlreadyExistsException.class, FileAlreadyExistsException.class,
FileNotFoundException.class, FileNotFoundException.class,
ParentNotDirectoryException.class, ParentNotDirectoryException.class,
NSQuotaExceededException.class, NSQuotaExceededException.class,
RetryStartFileException.class, RetryStartFileException.class,
SafeModeException.class, SafeModeException.class,
UnresolvedPathException.class, UnresolvedPathException.class,
SnapshotAccessControlException.class, SnapshotAccessControlException.class,
UnknownCryptoProtocolVersionException.class); UnknownCryptoProtocolVersionException.class);
if (e instanceof RetryStartFileException) { if (e instanceof RetryStartFileException) {
if (retryCount > 0) { if (retryCount > 0) {
shouldRetry = true; shouldRetry = true;
retryCount--; retryCount--;
} else {
throw new IOException("Too many retries because of encryption" +
" zone operations", e);
}
} else { } else {
throw new IOException("Too many retries because of encryption" + throw e;
" zone operations", e);
} }
} else {
throw e;
} }
} }
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
out.start();
return out;
} finally {
scope.close();
} }
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
out.start();
return out;
} }
/** Construct a new output stream for append. */ /** Construct a new output stream for append. */
@ -1661,21 +1678,16 @@ public class DFSOutputStream extends FSOutputSummer
this(dfsClient, src, progress, stat, checksum); this(dfsClient, src, progress, stat, checksum);
initialFileSize = stat.getLen(); // length of file when opened initialFileSize = stat.getLen(); // length of file when opened
Span traceSpan = null;
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
// The last partial block of the file has to be filled. // The last partial block of the file has to be filled.
if (!toNewBlock && lastBlock != null) { if (!toNewBlock && lastBlock != null) {
// indicate that we are appending to an existing block // indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize(); bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan); streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
} else { } else {
computePacketChunkSize(dfsClient.getConf().writePacketSize, computePacketChunkSize(dfsClient.getConf().writePacketSize,
bytesPerChecksum); bytesPerChecksum);
streamer = new DataStreamer(stat, streamer = new DataStreamer(stat,
lastBlock != null ? lastBlock.getBlock() : null, traceSpan); lastBlock != null ? lastBlock.getBlock() : null);
} }
this.fileEncryptionInfo = stat.getFileEncryptionInfo(); this.fileEncryptionInfo = stat.getFileEncryptionInfo();
} }
@ -1684,13 +1696,19 @@ public class DFSOutputStream extends FSOutputSummer
boolean toNewBlock, int bufferSize, Progressable progress, boolean toNewBlock, int bufferSize, Progressable progress,
LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
String[] favoredNodes) throws IOException { String[] favoredNodes) throws IOException {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock, TraceScope scope =
progress, lastBlock, stat, checksum); dfsClient.getPathTraceScope("newStreamForAppend", src);
if (favoredNodes != null && favoredNodes.length != 0) { try {
out.streamer.setFavoredNodes(favoredNodes); final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
progress, lastBlock, stat, checksum);
if (favoredNodes != null && favoredNodes.length != 0) {
out.streamer.setFavoredNodes(favoredNodes);
}
out.start();
return out;
} finally {
scope.close();
} }
out.start();
return out;
} }
private static boolean isLazyPersist(HdfsFileStatus stat) { private static boolean isLazyPersist(HdfsFileStatus stat) {
@ -1715,6 +1733,7 @@ public class DFSOutputStream extends FSOutputSummer
private void queueCurrentPacket() { private void queueCurrentPacket() {
synchronized (dataQueue) { synchronized (dataQueue) {
if (currentPacket == null) return; if (currentPacket == null) return;
currentPacket.addTraceParent(Trace.currentSpan());
dataQueue.addLast(currentPacket); dataQueue.addLast(currentPacket);
lastQueuedSeqno = currentPacket.getSeqno(); lastQueuedSeqno = currentPacket.getSeqno();
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
@ -1729,23 +1748,39 @@ public class DFSOutputStream extends FSOutputSummer
synchronized (dataQueue) { synchronized (dataQueue) {
try { try {
// If queue is full, then wait till we have enough space // If queue is full, then wait till we have enough space
while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) { boolean firstWait = true;
try { try {
dataQueue.wait(); while (!isClosed() && dataQueue.size() + ackQueue.size() >
} catch (InterruptedException e) { dfsClient.getConf().writeMaxPackets) {
// If we get interrupted while waiting to queue data, we still need to get rid if (firstWait) {
// of the current packet. This is because we have an invariant that if Span span = Trace.currentSpan();
// currentPacket gets full, it will get queued before the next writeChunk. if (span != null) {
// span.addTimelineAnnotation("dataQueue.wait");
// Rather than wait around for space in the queue, we should instead try to }
// return to the caller as soon as possible, even though we slightly overrun firstWait = false;
// the MAX_PACKETS length. }
Thread.currentThread().interrupt(); try {
break; dataQueue.wait();
} catch (InterruptedException e) {
// If we get interrupted while waiting to queue data, we still need to get rid
// of the current packet. This is because we have an invariant that if
// currentPacket gets full, it will get queued before the next writeChunk.
//
// Rather than wait around for space in the queue, we should instead try to
// return to the caller as soon as possible, even though we slightly overrun
// the MAX_PACKETS length.
Thread.currentThread().interrupt();
break;
}
}
} finally {
Span span = Trace.currentSpan();
if ((span != null) && (!firstWait)) {
span.addTimelineAnnotation("end.wait");
}
} }
} checkClosed();
checkClosed(); queueCurrentPacket();
queueCurrentPacket();
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
} }
} }
@ -1755,6 +1790,17 @@ public class DFSOutputStream extends FSOutputSummer
@Override @Override
protected synchronized void writeChunk(byte[] b, int offset, int len, protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException { byte[] checksum, int ckoff, int cklen) throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
try {
writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
} finally {
scope.close();
}
}
private synchronized void writeChunkImpl(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
checkClosed(); checkClosed();
@ -1838,12 +1884,24 @@ public class DFSOutputStream extends FSOutputSummer
*/ */
@Override @Override
public void hflush() throws IOException { public void hflush() throws IOException {
flushOrSync(false, EnumSet.noneOf(SyncFlag.class)); TraceScope scope =
dfsClient.getPathTraceScope("hflush", src);
try {
flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
} finally {
scope.close();
}
} }
@Override @Override
public void hsync() throws IOException { public void hsync() throws IOException {
hsync(EnumSet.noneOf(SyncFlag.class)); TraceScope scope =
dfsClient.getPathTraceScope("hsync", src);
try {
flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
} finally {
scope.close();
}
} }
/** /**
@ -1860,7 +1918,13 @@ public class DFSOutputStream extends FSOutputSummer
* whether or not to update the block length in NameNode. * whether or not to update the block length in NameNode.
*/ */
public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
flushOrSync(true, syncFlags); TraceScope scope =
dfsClient.getPathTraceScope("hsync", src);
try {
flushOrSync(true, syncFlags);
} finally {
scope.close();
}
} }
/** /**
@ -2041,33 +2105,38 @@ public class DFSOutputStream extends FSOutputSummer
} }
private void waitForAckedSeqno(long seqno) throws IOException { private void waitForAckedSeqno(long seqno) throws IOException {
if (DFSClient.LOG.isDebugEnabled()) { TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
DFSClient.LOG.debug("Waiting for ack for: " + seqno);
}
long begin = Time.monotonicNow();
try { try {
synchronized (dataQueue) { if (DFSClient.LOG.isDebugEnabled()) {
while (!isClosed()) { DFSClient.LOG.debug("Waiting for ack for: " + seqno);
checkClosed(); }
if (lastAckedSeqno >= seqno) { long begin = Time.monotonicNow();
break; try {
} synchronized (dataQueue) {
try { while (!isClosed()) {
dataQueue.wait(1000); // when we receive an ack, we notify on checkClosed();
// dataQueue if (lastAckedSeqno >= seqno) {
} catch (InterruptedException ie) { break;
throw new InterruptedIOException( }
"Interrupted while waiting for data to be acknowledged by pipeline"); try {
dataQueue.wait(1000); // when we receive an ack, we notify on
// dataQueue
} catch (InterruptedException ie) {
throw new InterruptedIOException(
"Interrupted while waiting for data to be acknowledged by pipeline");
}
} }
} }
checkClosed();
} catch (ClosedChannelException e) {
} }
checkClosed(); long duration = Time.monotonicNow() - begin;
} catch (ClosedChannelException e) { if (duration > dfsclientSlowLogThresholdMs) {
} DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
long duration = Time.monotonicNow() - begin; + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
if (duration > dfsclientSlowLogThresholdMs) { }
DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration } finally {
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); scope.close();
} }
} }
@ -2132,6 +2201,16 @@ public class DFSOutputStream extends FSOutputSummer
*/ */
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("DFSOutputStream#close", src);
try {
closeImpl();
} finally {
scope.close();
}
}
private synchronized void closeImpl() throws IOException {
if (isClosed()) { if (isClosed()) {
IOException e = lastException.getAndSet(null); IOException e = lastException.getAndSet(null);
if (e == null) if (e == null)
@ -2157,7 +2236,12 @@ public class DFSOutputStream extends FSOutputSummer
// get last block before destroying the streamer // get last block before destroying the streamer
ExtendedBlock lastBlock = streamer.getBlock(); ExtendedBlock lastBlock = streamer.getBlock();
closeThreads(false); closeThreads(false);
completeFile(lastBlock); TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
try {
completeFile(lastBlock);
} finally {
scope.close();
}
dfsClient.endFileLease(fileId); dfsClient.endFileLease(fileId);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
} finally { } finally {

View File

@ -21,9 +21,12 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.htrace.Span;
/**************************************************************** /****************************************************************
* DFSPacket is used by DataStreamer and DFSOutputStream. * DFSPacket is used by DataStreamer and DFSOutputStream.
@ -33,6 +36,7 @@ import org.apache.hadoop.hdfs.util.ByteArrayManager;
class DFSPacket { class DFSPacket {
public static final long HEART_BEAT_SEQNO = -1L; public static final long HEART_BEAT_SEQNO = -1L;
private static long[] EMPTY = new long[0];
private final long seqno; // sequence number of buffer in block private final long seqno; // sequence number of buffer in block
private final long offsetInBlock; // offset in block private final long offsetInBlock; // offset in block
private boolean syncBlock; // this packet forces the current block to disk private boolean syncBlock; // this packet forces the current block to disk
@ -59,6 +63,9 @@ class DFSPacket {
private int checksumPos; private int checksumPos;
private final int dataStart; private final int dataStart;
private int dataPos; private int dataPos;
private long[] traceParents = EMPTY;
private int traceParentsUsed;
private Span span;
/** /**
* Create a new packet. * Create a new packet.
@ -267,4 +274,70 @@ class DFSPacket {
" lastPacketInBlock: " + this.lastPacketInBlock + " lastPacketInBlock: " + this.lastPacketInBlock +
" lastByteOffsetInBlock: " + this.getLastByteOffsetBlock(); " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
} }
/**
* Add a trace parent span for this packet.<p/>
*
* Trace parent spans for a packet are the trace spans responsible for
* adding data to that packet. We store them as an array of longs for
* efficiency.<p/>
*
* Protected by the DFSOutputStream dataQueue lock.
*/
public void addTraceParent(Span span) {
if (span == null) {
return;
}
addTraceParent(span.getSpanId());
}
public void addTraceParent(long id) {
if (traceParentsUsed == traceParents.length) {
int newLength = (traceParents.length == 0) ? 8 :
traceParents.length * 2;
traceParents = Arrays.copyOf(traceParents, newLength);
}
traceParents[traceParentsUsed] = id;
traceParentsUsed++;
}
/**
* Get the trace parent spans for this packet.<p/>
*
* Will always be non-null.<p/>
*
* Protected by the DFSOutputStream dataQueue lock.
*/
public long[] getTraceParents() {
// Remove duplicates from the array.
int len = traceParentsUsed;
Arrays.sort(traceParents, 0, len);
int i = 0, j = 0;
long prevVal = 0; // 0 is not a valid span id
while (true) {
if (i == len) {
break;
}
long val = traceParents[i];
if (val != prevVal) {
traceParents[j] = val;
j++;
prevVal = val;
}
i++;
}
if (j < traceParents.length) {
traceParents = Arrays.copyOf(traceParents, j);
traceParentsUsed = traceParents.length;
}
return traceParents;
}
public void setTraceSpan(Span span) {
this.span = span;
}
public Span getTraceSpan() {
return span;
}
} }

View File

@ -65,4 +65,29 @@ public class TestDFSPacket {
} }
} }
} }
@Test
public void testAddParentsGetParents() throws Exception {
DFSPacket p = new DFSPacket(null, maxChunksPerPacket,
0, 0, checksumSize, false);
long parents[] = p.getTraceParents();
Assert.assertEquals(0, parents.length);
p.addTraceParent(123);
p.addTraceParent(123);
parents = p.getTraceParents();
Assert.assertEquals(1, parents.length);
Assert.assertEquals(123, parents[0]);
parents = p.getTraceParents(); // test calling 'get' again.
Assert.assertEquals(1, parents.length);
Assert.assertEquals(123, parents[0]);
p.addTraceParent(1);
p.addTraceParent(456);
p.addTraceParent(789);
parents = p.getTraceParents();
Assert.assertEquals(4, parents.length);
Assert.assertEquals(1, parents[0]);
Assert.assertEquals(123, parents[1]);
Assert.assertEquals(456, parents[2]);
Assert.assertEquals(789, parents[3]);
}
} }