HDFS-7054. Make DFSOutputStream tracing more fine-grained (cmccabe)
(cherry picked from commit 8234fd0e10
)
This commit is contained in:
parent
05627dd96d
commit
79c07bbaca
|
@ -454,6 +454,8 @@ Release 2.7.0 - UNRELEASED
|
|||
HDFS-7940. Add tracing to DFSClient#setQuotaByStorageType (Rakesh R via
|
||||
Colin P. McCabe)
|
||||
|
||||
HDFS-7054. Make DFSOutputStream tracing more fine-grained (cmccabe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||
|
|
|
@ -96,8 +96,11 @@ import org.apache.hadoop.util.DataChecksum;
|
|||
import org.apache.hadoop.util.DataChecksum.Type;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.htrace.NullScope;
|
||||
import org.apache.htrace.Sampler;
|
||||
import org.apache.htrace.Span;
|
||||
import org.apache.htrace.Trace;
|
||||
import org.apache.htrace.TraceInfo;
|
||||
import org.apache.htrace.TraceScope;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -271,17 +274,11 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
/** Append on an existing block? */
|
||||
private final boolean isAppend;
|
||||
|
||||
private final Span traceSpan;
|
||||
|
||||
/**
|
||||
* construction with tracing info
|
||||
*/
|
||||
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, Span span) {
|
||||
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
|
||||
isAppend = false;
|
||||
isLazyPersistFile = isLazyPersist(stat);
|
||||
this.block = block;
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
traceSpan = span;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -292,10 +289,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
* @throws IOException if error occurs
|
||||
*/
|
||||
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
|
||||
int bytesPerChecksum, Span span) throws IOException {
|
||||
int bytesPerChecksum) throws IOException {
|
||||
isAppend = true;
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
|
||||
traceSpan = span;
|
||||
block = lastBlock.getBlock();
|
||||
bytesSent = block.getNumBytes();
|
||||
accessToken = lastBlock.getBlockToken();
|
||||
|
@ -386,12 +382,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
@Override
|
||||
public void run() {
|
||||
long lastPacket = Time.now();
|
||||
TraceScope traceScope = null;
|
||||
if (traceSpan != null) {
|
||||
traceScope = Trace.continueSpan(traceSpan);
|
||||
}
|
||||
TraceScope scope = NullScope.INSTANCE;
|
||||
while (!streamerClosed && dfsClient.clientRunning) {
|
||||
|
||||
// if the Responder encountered an error, shutdown Responder
|
||||
if (hasError && response != null) {
|
||||
try {
|
||||
|
@ -437,11 +429,18 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// get packet to be sent.
|
||||
if (dataQueue.isEmpty()) {
|
||||
one = createHeartbeatPacket();
|
||||
assert one != null;
|
||||
} else {
|
||||
one = dataQueue.getFirst(); // regular data packet
|
||||
long parents[] = one.getTraceParents();
|
||||
if (parents.length > 0) {
|
||||
scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
|
||||
// TODO: use setParents API once it's available from HTrace 3.2
|
||||
// scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
|
||||
// scope.getSpan().setParents(parents);
|
||||
}
|
||||
}
|
||||
}
|
||||
assert one != null;
|
||||
|
||||
// get new block from namenode.
|
||||
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
|
||||
|
@ -487,9 +486,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
|
||||
// send the packet
|
||||
Span span = null;
|
||||
synchronized (dataQueue) {
|
||||
// move packet from dataQueue to ackQueue
|
||||
if (!one.isHeartbeatPacket()) {
|
||||
span = scope.detach();
|
||||
one.setTraceSpan(span);
|
||||
dataQueue.removeFirst();
|
||||
ackQueue.addLast(one);
|
||||
dataQueue.notifyAll();
|
||||
|
@ -502,6 +504,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
|
||||
// write out data to remote datanode
|
||||
TraceScope writeScope = Trace.startSpan("writeTo", span);
|
||||
try {
|
||||
one.writeTo(blockStream);
|
||||
blockStream.flush();
|
||||
|
@ -514,6 +517,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// will be taken out then.
|
||||
tryMarkPrimaryDatanodeFailed();
|
||||
throw e;
|
||||
} finally {
|
||||
writeScope.close();
|
||||
}
|
||||
lastPacket = Time.now();
|
||||
|
||||
|
@ -569,11 +574,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// Not a datanode issue
|
||||
streamerClosed = true;
|
||||
}
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
if (traceScope != null) {
|
||||
traceScope.close();
|
||||
}
|
||||
closeInternal();
|
||||
}
|
||||
|
||||
|
@ -728,6 +732,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
setName("ResponseProcessor for block " + block);
|
||||
PipelineAck ack = new PipelineAck();
|
||||
|
||||
TraceScope scope = NullScope.INSTANCE;
|
||||
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
|
||||
// process responses from datanodes.
|
||||
try {
|
||||
|
@ -802,6 +807,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
block.setNumBytes(one.getLastByteOffsetBlock());
|
||||
|
||||
synchronized (dataQueue) {
|
||||
scope = Trace.continueSpan(one.getTraceSpan());
|
||||
one.setTraceSpan(null);
|
||||
lastAckedSeqno = seqno;
|
||||
ackQueue.removeFirst();
|
||||
dataQueue.notifyAll();
|
||||
|
@ -826,6 +833,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
responderClosed = true;
|
||||
}
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -886,6 +895,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// a client waiting on close() will be aware that the flush finished.
|
||||
synchronized (dataQueue) {
|
||||
DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
|
||||
Span span = endOfBlockPacket.getTraceSpan();
|
||||
if (span != null) {
|
||||
// Close any trace span associated with this Packet
|
||||
TraceScope scope = Trace.continueSpan(span);
|
||||
scope.close();
|
||||
}
|
||||
assert endOfBlockPacket.isLastPacketInBlock();
|
||||
assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
|
||||
lastAckedSeqno = endOfBlockPacket.getSeqno();
|
||||
|
@ -1593,11 +1608,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
|
||||
|
||||
Span traceSpan = null;
|
||||
if (Trace.isTracing()) {
|
||||
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
|
||||
}
|
||||
streamer = new DataStreamer(stat, null, traceSpan);
|
||||
streamer = new DataStreamer(stat, null);
|
||||
if (favoredNodes != null && favoredNodes.length != 0) {
|
||||
streamer.setFavoredNodes(favoredNodes);
|
||||
}
|
||||
|
@ -1607,50 +1618,56 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
|
||||
short replication, long blockSize, Progressable progress, int buffersize,
|
||||
DataChecksum checksum, String[] favoredNodes) throws IOException {
|
||||
HdfsFileStatus stat = null;
|
||||
TraceScope scope =
|
||||
dfsClient.getPathTraceScope("newStreamForCreate", src);
|
||||
try {
|
||||
HdfsFileStatus stat = null;
|
||||
|
||||
// Retry the create if we get a RetryStartFileException up to a maximum
|
||||
// number of times
|
||||
boolean shouldRetry = true;
|
||||
int retryCount = CREATE_RETRY_COUNT;
|
||||
while (shouldRetry) {
|
||||
shouldRetry = false;
|
||||
try {
|
||||
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
|
||||
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
|
||||
blockSize, SUPPORTED_CRYPTO_VERSIONS);
|
||||
break;
|
||||
} catch (RemoteException re) {
|
||||
IOException e = re.unwrapRemoteException(
|
||||
AccessControlException.class,
|
||||
DSQuotaExceededException.class,
|
||||
FileAlreadyExistsException.class,
|
||||
FileNotFoundException.class,
|
||||
ParentNotDirectoryException.class,
|
||||
NSQuotaExceededException.class,
|
||||
RetryStartFileException.class,
|
||||
SafeModeException.class,
|
||||
UnresolvedPathException.class,
|
||||
SnapshotAccessControlException.class,
|
||||
UnknownCryptoProtocolVersionException.class);
|
||||
if (e instanceof RetryStartFileException) {
|
||||
if (retryCount > 0) {
|
||||
shouldRetry = true;
|
||||
retryCount--;
|
||||
// Retry the create if we get a RetryStartFileException up to a maximum
|
||||
// number of times
|
||||
boolean shouldRetry = true;
|
||||
int retryCount = CREATE_RETRY_COUNT;
|
||||
while (shouldRetry) {
|
||||
shouldRetry = false;
|
||||
try {
|
||||
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
|
||||
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
|
||||
blockSize, SUPPORTED_CRYPTO_VERSIONS);
|
||||
break;
|
||||
} catch (RemoteException re) {
|
||||
IOException e = re.unwrapRemoteException(
|
||||
AccessControlException.class,
|
||||
DSQuotaExceededException.class,
|
||||
FileAlreadyExistsException.class,
|
||||
FileNotFoundException.class,
|
||||
ParentNotDirectoryException.class,
|
||||
NSQuotaExceededException.class,
|
||||
RetryStartFileException.class,
|
||||
SafeModeException.class,
|
||||
UnresolvedPathException.class,
|
||||
SnapshotAccessControlException.class,
|
||||
UnknownCryptoProtocolVersionException.class);
|
||||
if (e instanceof RetryStartFileException) {
|
||||
if (retryCount > 0) {
|
||||
shouldRetry = true;
|
||||
retryCount--;
|
||||
} else {
|
||||
throw new IOException("Too many retries because of encryption" +
|
||||
" zone operations", e);
|
||||
}
|
||||
} else {
|
||||
throw new IOException("Too many retries because of encryption" +
|
||||
" zone operations", e);
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
|
||||
flag, progress, checksum, favoredNodes);
|
||||
out.start();
|
||||
return out;
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
|
||||
flag, progress, checksum, favoredNodes);
|
||||
out.start();
|
||||
return out;
|
||||
}
|
||||
|
||||
/** Construct a new output stream for append. */
|
||||
|
@ -1660,21 +1677,16 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
this(dfsClient, src, progress, stat, checksum);
|
||||
initialFileSize = stat.getLen(); // length of file when opened
|
||||
|
||||
Span traceSpan = null;
|
||||
if (Trace.isTracing()) {
|
||||
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
|
||||
}
|
||||
|
||||
// The last partial block of the file has to be filled.
|
||||
if (!toNewBlock && lastBlock != null) {
|
||||
// indicate that we are appending to an existing block
|
||||
bytesCurBlock = lastBlock.getBlockSize();
|
||||
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
|
||||
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
|
||||
} else {
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
||||
bytesPerChecksum);
|
||||
streamer = new DataStreamer(stat,
|
||||
lastBlock != null ? lastBlock.getBlock() : null, traceSpan);
|
||||
lastBlock != null ? lastBlock.getBlock() : null);
|
||||
}
|
||||
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
|
||||
}
|
||||
|
@ -1683,13 +1695,19 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
boolean toNewBlock, int bufferSize, Progressable progress,
|
||||
LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
|
||||
String[] favoredNodes) throws IOException {
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
|
||||
progress, lastBlock, stat, checksum);
|
||||
if (favoredNodes != null && favoredNodes.length != 0) {
|
||||
out.streamer.setFavoredNodes(favoredNodes);
|
||||
TraceScope scope =
|
||||
dfsClient.getPathTraceScope("newStreamForAppend", src);
|
||||
try {
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
|
||||
progress, lastBlock, stat, checksum);
|
||||
if (favoredNodes != null && favoredNodes.length != 0) {
|
||||
out.streamer.setFavoredNodes(favoredNodes);
|
||||
}
|
||||
out.start();
|
||||
return out;
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
out.start();
|
||||
return out;
|
||||
}
|
||||
|
||||
private static boolean isLazyPersist(HdfsFileStatus stat) {
|
||||
|
@ -1714,6 +1732,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private void queueCurrentPacket() {
|
||||
synchronized (dataQueue) {
|
||||
if (currentPacket == null) return;
|
||||
currentPacket.addTraceParent(Trace.currentSpan());
|
||||
dataQueue.addLast(currentPacket);
|
||||
lastQueuedSeqno = currentPacket.getSeqno();
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
|
@ -1728,23 +1747,39 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
synchronized (dataQueue) {
|
||||
try {
|
||||
// If queue is full, then wait till we have enough space
|
||||
while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets) {
|
||||
boolean firstWait = true;
|
||||
try {
|
||||
dataQueue.wait();
|
||||
} catch (InterruptedException e) {
|
||||
// If we get interrupted while waiting to queue data, we still need to get rid
|
||||
// of the current packet. This is because we have an invariant that if
|
||||
// currentPacket gets full, it will get queued before the next writeChunk.
|
||||
//
|
||||
// Rather than wait around for space in the queue, we should instead try to
|
||||
// return to the caller as soon as possible, even though we slightly overrun
|
||||
// the MAX_PACKETS length.
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
while (!isClosed() && dataQueue.size() + ackQueue.size() >
|
||||
dfsClient.getConf().writeMaxPackets) {
|
||||
if (firstWait) {
|
||||
Span span = Trace.currentSpan();
|
||||
if (span != null) {
|
||||
span.addTimelineAnnotation("dataQueue.wait");
|
||||
}
|
||||
firstWait = false;
|
||||
}
|
||||
try {
|
||||
dataQueue.wait();
|
||||
} catch (InterruptedException e) {
|
||||
// If we get interrupted while waiting to queue data, we still need to get rid
|
||||
// of the current packet. This is because we have an invariant that if
|
||||
// currentPacket gets full, it will get queued before the next writeChunk.
|
||||
//
|
||||
// Rather than wait around for space in the queue, we should instead try to
|
||||
// return to the caller as soon as possible, even though we slightly overrun
|
||||
// the MAX_PACKETS length.
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
Span span = Trace.currentSpan();
|
||||
if ((span != null) && (!firstWait)) {
|
||||
span.addTimelineAnnotation("end.wait");
|
||||
}
|
||||
}
|
||||
}
|
||||
checkClosed();
|
||||
queueCurrentPacket();
|
||||
checkClosed();
|
||||
queueCurrentPacket();
|
||||
} catch (ClosedChannelException e) {
|
||||
}
|
||||
}
|
||||
|
@ -1754,6 +1789,17 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
@Override
|
||||
protected synchronized void writeChunk(byte[] b, int offset, int len,
|
||||
byte[] checksum, int ckoff, int cklen) throws IOException {
|
||||
TraceScope scope =
|
||||
dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
|
||||
try {
|
||||
writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void writeChunkImpl(byte[] b, int offset, int len,
|
||||
byte[] checksum, int ckoff, int cklen) throws IOException {
|
||||
dfsClient.checkOpen();
|
||||
checkClosed();
|
||||
|
||||
|
@ -1842,12 +1888,24 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
*/
|
||||
@Override
|
||||
public void hflush() throws IOException {
|
||||
flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
|
||||
TraceScope scope =
|
||||
dfsClient.getPathTraceScope("hflush", src);
|
||||
try {
|
||||
flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void hsync() throws IOException {
|
||||
hsync(EnumSet.noneOf(SyncFlag.class));
|
||||
TraceScope scope =
|
||||
dfsClient.getPathTraceScope("hsync", src);
|
||||
try {
|
||||
flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1864,7 +1922,13 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
* whether or not to update the block length in NameNode.
|
||||
*/
|
||||
public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
|
||||
flushOrSync(true, syncFlags);
|
||||
TraceScope scope =
|
||||
dfsClient.getPathTraceScope("hsync", src);
|
||||
try {
|
||||
flushOrSync(true, syncFlags);
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2045,33 +2109,38 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
|
||||
private void waitForAckedSeqno(long seqno) throws IOException {
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("Waiting for ack for: " + seqno);
|
||||
}
|
||||
long begin = Time.monotonicNow();
|
||||
TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
|
||||
try {
|
||||
synchronized (dataQueue) {
|
||||
while (!isClosed()) {
|
||||
checkClosed();
|
||||
if (lastAckedSeqno >= seqno) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
dataQueue.wait(1000); // when we receive an ack, we notify on
|
||||
// dataQueue
|
||||
} catch (InterruptedException ie) {
|
||||
throw new InterruptedIOException(
|
||||
"Interrupted while waiting for data to be acknowledged by pipeline");
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("Waiting for ack for: " + seqno);
|
||||
}
|
||||
long begin = Time.monotonicNow();
|
||||
try {
|
||||
synchronized (dataQueue) {
|
||||
while (!isClosed()) {
|
||||
checkClosed();
|
||||
if (lastAckedSeqno >= seqno) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
dataQueue.wait(1000); // when we receive an ack, we notify on
|
||||
// dataQueue
|
||||
} catch (InterruptedException ie) {
|
||||
throw new InterruptedIOException(
|
||||
"Interrupted while waiting for data to be acknowledged by pipeline");
|
||||
}
|
||||
}
|
||||
}
|
||||
checkClosed();
|
||||
} catch (ClosedChannelException e) {
|
||||
}
|
||||
checkClosed();
|
||||
} catch (ClosedChannelException e) {
|
||||
}
|
||||
long duration = Time.monotonicNow() - begin;
|
||||
if (duration > dfsclientSlowLogThresholdMs) {
|
||||
DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
|
||||
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
|
||||
long duration = Time.monotonicNow() - begin;
|
||||
if (duration > dfsclientSlowLogThresholdMs) {
|
||||
DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
|
||||
+ "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
|
||||
}
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2136,6 +2205,16 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
*/
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
TraceScope scope =
|
||||
dfsClient.getPathTraceScope("DFSOutputStream#close", src);
|
||||
try {
|
||||
closeImpl();
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void closeImpl() throws IOException {
|
||||
if (isClosed()) {
|
||||
IOException e = lastException.getAndSet(null);
|
||||
if (e == null)
|
||||
|
@ -2161,7 +2240,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// get last block before destroying the streamer
|
||||
ExtendedBlock lastBlock = streamer.getBlock();
|
||||
closeThreads(false);
|
||||
completeFile(lastBlock);
|
||||
TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
|
||||
try {
|
||||
completeFile(lastBlock);
|
||||
} finally {
|
||||
scope.close();
|
||||
}
|
||||
dfsClient.endFileLease(fileId);
|
||||
} catch (ClosedChannelException e) {
|
||||
} finally {
|
||||
|
|
|
@ -21,9 +21,12 @@ import java.io.DataOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
||||
import org.apache.htrace.Span;
|
||||
|
||||
/****************************************************************
|
||||
* DFSPacket is used by DataStreamer and DFSOutputStream.
|
||||
|
@ -33,6 +36,7 @@ import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
|||
|
||||
class DFSPacket {
|
||||
public static final long HEART_BEAT_SEQNO = -1L;
|
||||
private static long[] EMPTY = new long[0];
|
||||
private final long seqno; // sequence number of buffer in block
|
||||
private final long offsetInBlock; // offset in block
|
||||
private boolean syncBlock; // this packet forces the current block to disk
|
||||
|
@ -59,6 +63,9 @@ class DFSPacket {
|
|||
private int checksumPos;
|
||||
private final int dataStart;
|
||||
private int dataPos;
|
||||
private long[] traceParents = EMPTY;
|
||||
private int traceParentsUsed;
|
||||
private Span span;
|
||||
|
||||
/**
|
||||
* Create a new packet.
|
||||
|
@ -267,4 +274,70 @@ class DFSPacket {
|
|||
" lastPacketInBlock: " + this.lastPacketInBlock +
|
||||
" lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a trace parent span for this packet.<p/>
|
||||
*
|
||||
* Trace parent spans for a packet are the trace spans responsible for
|
||||
* adding data to that packet. We store them as an array of longs for
|
||||
* efficiency.<p/>
|
||||
*
|
||||
* Protected by the DFSOutputStream dataQueue lock.
|
||||
*/
|
||||
public void addTraceParent(Span span) {
|
||||
if (span == null) {
|
||||
return;
|
||||
}
|
||||
addTraceParent(span.getSpanId());
|
||||
}
|
||||
|
||||
public void addTraceParent(long id) {
|
||||
if (traceParentsUsed == traceParents.length) {
|
||||
int newLength = (traceParents.length == 0) ? 8 :
|
||||
traceParents.length * 2;
|
||||
traceParents = Arrays.copyOf(traceParents, newLength);
|
||||
}
|
||||
traceParents[traceParentsUsed] = id;
|
||||
traceParentsUsed++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the trace parent spans for this packet.<p/>
|
||||
*
|
||||
* Will always be non-null.<p/>
|
||||
*
|
||||
* Protected by the DFSOutputStream dataQueue lock.
|
||||
*/
|
||||
public long[] getTraceParents() {
|
||||
// Remove duplicates from the array.
|
||||
int len = traceParentsUsed;
|
||||
Arrays.sort(traceParents, 0, len);
|
||||
int i = 0, j = 0;
|
||||
long prevVal = 0; // 0 is not a valid span id
|
||||
while (true) {
|
||||
if (i == len) {
|
||||
break;
|
||||
}
|
||||
long val = traceParents[i];
|
||||
if (val != prevVal) {
|
||||
traceParents[j] = val;
|
||||
j++;
|
||||
prevVal = val;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
if (j < traceParents.length) {
|
||||
traceParents = Arrays.copyOf(traceParents, j);
|
||||
traceParentsUsed = traceParents.length;
|
||||
}
|
||||
return traceParents;
|
||||
}
|
||||
|
||||
public void setTraceSpan(Span span) {
|
||||
this.span = span;
|
||||
}
|
||||
|
||||
public Span getTraceSpan() {
|
||||
return span;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,4 +65,29 @@ public class TestDFSPacket {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddParentsGetParents() throws Exception {
|
||||
DFSPacket p = new DFSPacket(null, maxChunksPerPacket,
|
||||
0, 0, checksumSize, false);
|
||||
long parents[] = p.getTraceParents();
|
||||
Assert.assertEquals(0, parents.length);
|
||||
p.addTraceParent(123);
|
||||
p.addTraceParent(123);
|
||||
parents = p.getTraceParents();
|
||||
Assert.assertEquals(1, parents.length);
|
||||
Assert.assertEquals(123, parents[0]);
|
||||
parents = p.getTraceParents(); // test calling 'get' again.
|
||||
Assert.assertEquals(1, parents.length);
|
||||
Assert.assertEquals(123, parents[0]);
|
||||
p.addTraceParent(1);
|
||||
p.addTraceParent(456);
|
||||
p.addTraceParent(789);
|
||||
parents = p.getTraceParents();
|
||||
Assert.assertEquals(4, parents.length);
|
||||
Assert.assertEquals(1, parents[0]);
|
||||
Assert.assertEquals(123, parents[1]);
|
||||
Assert.assertEquals(456, parents[2]);
|
||||
Assert.assertEquals(789, parents[3]);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue