HADOOP-17272. ABFS Streams to support IOStatistics API (#2604)

Contributed by Mehakmeet Singh.

Change-Id: I3445dec84b9b9e43bb1e41f709944ea05416bd74
This commit is contained in:
Mehakmeet Singh 2021-01-12 21:18:09 +05:30 committed by Steve Loughran
parent 4865589bb4
commit d20b2deac3
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
10 changed files with 443 additions and 200 deletions

View File

@ -286,6 +286,78 @@ public final class StreamStatisticNames {
public static final String STREAM_WRITE_TOTAL_DATA public static final String STREAM_WRITE_TOTAL_DATA
= "stream_write_total_data"; = "stream_write_total_data";
/**
* Number of bytes to upload from an OutputStream.
*/
public static final String BYTES_TO_UPLOAD
= "bytes_upload";
/**
* Number of bytes uploaded successfully to the object store.
*/
public static final String BYTES_UPLOAD_SUCCESSFUL
= "bytes_upload_successfully";
/**
* Number of bytes failed to upload to the object store.
*/
public static final String BYTES_UPLOAD_FAILED
= "bytes_upload_failed";
/**
* Total time spent on waiting for a task to complete.
*/
public static final String TIME_SPENT_ON_TASK_WAIT
= "time_spent_task_wait";
/**
* Number of task queue shrunk operations.
*/
public static final String QUEUE_SHRUNK_OPS
= "queue_shrunk_ops";
/**
* Number of times current buffer is written to the service.
*/
public static final String WRITE_CURRENT_BUFFER_OPERATIONS
= "write_current_buffer_ops";
/**
* Total time spent on completing a PUT request.
*/
public static final String TIME_SPENT_ON_PUT_REQUEST
= "time_spent_on_put_request";
/**
* Number of seeks in buffer.
*/
public static final String SEEK_IN_BUFFER
= "seek_in_buffer";
/**
* Number of bytes read from the buffer.
*/
public static final String BYTES_READ_BUFFER
= "bytes_read_buffer";
/**
* Total number of remote read operations performed.
*/
public static final String REMOTE_READ_OP
= "remote_read_op";
/**
* Total number of bytes read from readAhead.
*/
public static final String READ_AHEAD_BYTES_READ
= "read_ahead_bytes_read";
/**
* Total number of bytes read from remote operations.
*/
public static final String REMOTE_BYTES_READ
= "remote_bytes_read";
private StreamStatisticNames() { private StreamStatisticNames() {
} }

View File

@ -37,6 +37,11 @@ import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import static java.lang.Math.max; import static java.lang.Math.max;
import static java.lang.Math.min; import static java.lang.Math.min;
@ -48,7 +53,7 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase;
* The AbfsInputStream for AbfsClient. * The AbfsInputStream for AbfsClient.
*/ */
public class AbfsInputStream extends FSInputStream implements CanUnbuffer, public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
StreamCapabilities { StreamCapabilities, IOStatisticsSource {
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
// Footer size is set to qualify for both ORC and parquet files // Footer size is set to qualify for both ORC and parquet files
public static final int FOOTER_SIZE = 16 * ONE_KB; public static final int FOOTER_SIZE = 16 * ONE_KB;
@ -92,6 +97,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private long bytesFromRemoteRead; // bytes read remotely; for testing private long bytesFromRemoteRead; // bytes read remotely; for testing
private final AbfsInputStreamContext context; private final AbfsInputStreamContext context;
private IOStatistics ioStatistics;
public AbfsInputStream( public AbfsInputStream(
final AbfsClient client, final AbfsClient client,
@ -120,6 +126,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
// Propagate the config values to ReadBufferManager so that the first instance // Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize // to initialize can set the readAheadBlockSize
ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize); ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
if (streamStatistics != null) {
ioStatistics = streamStatistics.getIOStatistics();
}
} }
public String getPath() { public String getPath() {
@ -152,7 +161,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
int lastReadBytes; int lastReadBytes;
int totalReadBytes = 0; int totalReadBytes = 0;
if (streamStatistics != null) { if (streamStatistics != null) {
streamStatistics.readOperationStarted(off, len); streamStatistics.readOperationStarted();
} }
incrementReadOps(); incrementReadOps();
do { do {
@ -431,7 +440,10 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
AbfsPerfTracker tracker = client.getAbfsPerfTracker(); AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); op = IOStatisticsBinding.trackDuration((IOStatisticsStore) ioStatistics,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
() -> client.read(path, position, b, offset, length,
tolerateOobAppends ? "*" : eTag, cachedSasToken.get()));
cachedSasToken.update(op.getSasToken()); cachedSasToken.update(op.getSasToken());
if (streamStatistics != null) { if (streamStatistics != null) {
streamStatistics.remoteReadOperation(); streamStatistics.remoteReadOperation();
@ -694,6 +706,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
return alwaysReadBufferSize; return alwaysReadBufferSize;
} }
@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
}
/** /**
* Get the statistics of the stream. * Get the statistics of the stream.
* @return a string value. * @return a string value.

View File

@ -19,12 +19,14 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
/** /**
* Interface for statistics for the AbfsInputStream. * Interface for statistics for the AbfsInputStream.
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
public interface AbfsInputStreamStatistics { public interface AbfsInputStreamStatistics extends IOStatisticsSource {
/** /**
* Seek backwards, incrementing the seek and backward seek counters. * Seek backwards, incrementing the seek and backward seek counters.
* *
@ -73,11 +75,8 @@ public interface AbfsInputStreamStatistics {
/** /**
* A {@code read(byte[] buf, int off, int len)} operation has started. * A {@code read(byte[] buf, int off, int len)} operation has started.
*
* @param pos starting position of the read.
* @param len length of bytes to read.
*/ */
void readOperationStarted(long pos, long len); void readOperationStarted();
/** /**
* Records a successful remote read operation. * Records a successful remote read operation.
@ -96,6 +95,12 @@ public interface AbfsInputStreamStatistics {
*/ */
void remoteBytesRead(long bytes); void remoteBytesRead(long bytes);
/**
* Get the IOStatisticsStore instance from AbfsInputStreamStatistics.
* @return instance of IOStatisticsStore which extends IOStatistics.
*/
IOStatistics getIOStatistics();
/** /**
* Makes the string of all the AbfsInputStream statistics. * Makes the string of all the AbfsInputStream statistics.
* @return the string with all the statistics. * @return the string with all the statistics.

View File

@ -18,23 +18,50 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
/** /**
* Stats for the AbfsInputStream. * Stats for the AbfsInputStream.
*/ */
public class AbfsInputStreamStatisticsImpl public class AbfsInputStreamStatisticsImpl
implements AbfsInputStreamStatistics { implements AbfsInputStreamStatistics {
private long seekOperations;
private long forwardSeekOperations; private final IOStatisticsStore ioStatisticsStore = iostatisticsStore()
private long backwardSeekOperations; .withCounters(
private long bytesRead; StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS,
private long bytesSkippedOnSeek; StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS,
private long bytesBackwardsOnSeek; StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS,
private long seekInBuffer; StreamStatisticNames.STREAM_READ_BYTES,
private long readOperations; StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
private long bytesReadFromBuffer; StreamStatisticNames.STREAM_READ_OPERATIONS,
private long remoteReadOperations; StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS,
private long readAheadBytesRead; StreamStatisticNames.SEEK_IN_BUFFER,
private long remoteBytesRead; StreamStatisticNames.BYTES_READ_BUFFER,
StreamStatisticNames.REMOTE_READ_OP,
StreamStatisticNames.READ_AHEAD_BYTES_READ,
StreamStatisticNames.REMOTE_BYTES_READ
)
.withDurationTracking(ACTION_HTTP_GET_REQUEST)
.build();
/* Reference to the atomic counter for frequently updated counters to avoid
* cost of the map lookup on every increment.
*/
private final AtomicLong bytesRead =
ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_BYTES);
private final AtomicLong readOps =
ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_OPERATIONS);
private final AtomicLong seekOps =
ioStatisticsStore.getCounterReference(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS);
/** /**
* Seek backwards, incrementing the seek and backward seek counters. * Seek backwards, incrementing the seek and backward seek counters.
@ -44,9 +71,9 @@ public class AbfsInputStreamStatisticsImpl
*/ */
@Override @Override
public void seekBackwards(long negativeOffset) { public void seekBackwards(long negativeOffset) {
seekOperations++; seekOps.incrementAndGet();
backwardSeekOperations++; ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS);
bytesBackwardsOnSeek -= negativeOffset; ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS, negativeOffset);
} }
/** /**
@ -58,11 +85,9 @@ public class AbfsInputStreamStatisticsImpl
*/ */
@Override @Override
public void seekForwards(long skipped) { public void seekForwards(long skipped) {
seekOperations++; seekOps.incrementAndGet();
forwardSeekOperations++; ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
if (skipped > 0) { ioStatisticsStore.incrementCounter(StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, skipped);
bytesSkippedOnSeek += skipped;
}
} }
/** /**
@ -90,9 +115,7 @@ public class AbfsInputStreamStatisticsImpl
*/ */
@Override @Override
public void bytesRead(long bytes) { public void bytesRead(long bytes) {
if (bytes > 0) { bytesRead.addAndGet(bytes);
bytesRead += bytes;
}
} }
/** /**
@ -104,9 +127,7 @@ public class AbfsInputStreamStatisticsImpl
*/ */
@Override @Override
public void bytesReadFromBuffer(long bytes) { public void bytesReadFromBuffer(long bytes) {
if (bytes > 0) { ioStatisticsStore.incrementCounter(StreamStatisticNames.BYTES_READ_BUFFER, bytes);
bytesReadFromBuffer += bytes;
}
} }
/** /**
@ -116,18 +137,15 @@ public class AbfsInputStreamStatisticsImpl
*/ */
@Override @Override
public void seekInBuffer() { public void seekInBuffer() {
seekInBuffer++; ioStatisticsStore.incrementCounter(StreamStatisticNames.SEEK_IN_BUFFER);
} }
/** /**
* A {@code read(byte[] buf, int off, int len)} operation has started. * A {@code read(byte[] buf, int off, int len)} operation has started.
*
* @param pos starting position of the read.
* @param len length of bytes to read.
*/ */
@Override @Override
public void readOperationStarted(long pos, long len) { public void readOperationStarted() {
readOperations++; readOps.incrementAndGet();
} }
/** /**
@ -137,9 +155,7 @@ public class AbfsInputStreamStatisticsImpl
*/ */
@Override @Override
public void readAheadBytesRead(long bytes) { public void readAheadBytesRead(long bytes) {
if (bytes > 0) { ioStatisticsStore.incrementCounter(StreamStatisticNames.READ_AHEAD_BYTES_READ, bytes);
readAheadBytesRead += bytes;
}
} }
/** /**
@ -149,9 +165,7 @@ public class AbfsInputStreamStatisticsImpl
*/ */
@Override @Override
public void remoteBytesRead(long bytes) { public void remoteBytesRead(long bytes) {
if (bytes > 0) { ioStatisticsStore.incrementCounter(StreamStatisticNames.REMOTE_BYTES_READ, bytes);
remoteBytesRead += bytes;
}
} }
/** /**
@ -161,55 +175,88 @@ public class AbfsInputStreamStatisticsImpl
*/ */
@Override @Override
public void remoteReadOperation() { public void remoteReadOperation() {
remoteReadOperations++; ioStatisticsStore.incrementCounter(StreamStatisticNames.REMOTE_READ_OP);
} }
/**
* Getter for IOStatistics instance used.
* @return IOStatisticsStore instance which extends IOStatistics.
*/
@Override
public IOStatistics getIOStatistics() {
return ioStatisticsStore;
}
@VisibleForTesting
public long getSeekOperations() { public long getSeekOperations() {
return seekOperations; return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS);
} }
@VisibleForTesting
public long getForwardSeekOperations() { public long getForwardSeekOperations() {
return forwardSeekOperations; return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
} }
@VisibleForTesting
public long getBackwardSeekOperations() { public long getBackwardSeekOperations() {
return backwardSeekOperations; return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BACKWARD_OPERATIONS);
} }
@VisibleForTesting
public long getBytesRead() { public long getBytesRead() {
return bytesRead; return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_BYTES);
} }
@VisibleForTesting
public long getBytesSkippedOnSeek() { public long getBytesSkippedOnSeek() {
return bytesSkippedOnSeek; return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED);
} }
@VisibleForTesting
public long getBytesBackwardsOnSeek() { public long getBytesBackwardsOnSeek() {
return bytesBackwardsOnSeek; return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_SEEK_BYTES_BACKWARDS);
} }
@VisibleForTesting
public long getSeekInBuffer() { public long getSeekInBuffer() {
return seekInBuffer; return ioStatisticsStore.counters().get(StreamStatisticNames.SEEK_IN_BUFFER);
} }
@VisibleForTesting
public long getReadOperations() { public long getReadOperations() {
return readOperations; return ioStatisticsStore.counters().get(StreamStatisticNames.STREAM_READ_OPERATIONS);
} }
@VisibleForTesting
public long getBytesReadFromBuffer() { public long getBytesReadFromBuffer() {
return bytesReadFromBuffer; return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_READ_BUFFER);
} }
@VisibleForTesting
public long getRemoteReadOperations() { public long getRemoteReadOperations() {
return remoteReadOperations; return ioStatisticsStore.counters().get(StreamStatisticNames.REMOTE_READ_OP);
} }
@VisibleForTesting
public long getReadAheadBytesRead() { public long getReadAheadBytesRead() {
return readAheadBytesRead; return ioStatisticsStore.counters().get(StreamStatisticNames.READ_AHEAD_BYTES_READ);
} }
@VisibleForTesting
public long getRemoteBytesRead() { public long getRemoteBytesRead() {
return remoteBytesRead; return ioStatisticsStore.counters().get(StreamStatisticNames.REMOTE_BYTES_READ);
}
/**
* Getter for the mean value of the time taken to complete a HTTP GET
* request by AbfsInputStream.
* @return mean value.
*/
@VisibleForTesting
public double getActionHttpGetRequest() {
return ioStatisticsStore.meanStatistics().
get(ACTION_HTTP_GET_REQUEST + SUFFIX_MEAN).mean();
} }
/** /**
@ -223,18 +270,7 @@ public class AbfsInputStreamStatisticsImpl
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder( final StringBuilder sb = new StringBuilder(
"StreamStatistics{"); "StreamStatistics{");
sb.append(", SeekOperations=").append(seekOperations); sb.append(ioStatisticsStore.toString());
sb.append(", ForwardSeekOperations=").append(forwardSeekOperations);
sb.append(", BackwardSeekOperations=").append(backwardSeekOperations);
sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
sb.append(", seekInBuffer=").append(seekInBuffer);
sb.append(", BytesRead=").append(bytesRead);
sb.append(", ReadOperations=").append(readOperations);
sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
sb.append(", remoteReadOperations=").append(remoteReadOperations);
sb.append(", readAheadBytesRead=").append(readAheadBytesRead);
sb.append(", remoteBytesRead=").append(remoteBytesRead);
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }

View File

@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Callable;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -43,6 +42,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
@ -57,7 +62,8 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestPara
/** /**
* The BlobFsOutputStream for Rest AbfsClient. * The BlobFsOutputStream for Rest AbfsClient.
*/ */
public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities { public class AbfsOutputStream extends OutputStream implements Syncable,
StreamCapabilities, IOStatisticsSource {
private final AbfsClient client; private final AbfsClient client;
private final String path; private final String path;
@ -97,6 +103,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final Statistics statistics; private final Statistics statistics;
private final AbfsOutputStreamStatistics outputStreamStatistics; private final AbfsOutputStreamStatistics outputStreamStatistics;
private IOStatistics ioStatistics;
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(AbfsOutputStream.class); LoggerFactory.getLogger(AbfsOutputStream.class);
@ -144,6 +151,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.cachedSasToken = new CachedSASToken( this.cachedSasToken = new CachedSASToken(
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
if (outputStreamStatistics != null) {
this.ioStatistics = outputStreamStatistics.getIOStatistics();
}
} }
/** /**
@ -354,11 +364,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
if (bufferIndex == 0) { if (bufferIndex == 0) {
return; return;
} }
outputStreamStatistics.writeCurrentBuffer();
final byte[] bytes = buffer; final byte[] bytes = buffer;
final int bytesLength = bufferIndex; final int bytesLength = bufferIndex;
if (outputStreamStatistics != null) {
outputStreamStatistics.writeCurrentBuffer();
outputStreamStatistics.bytesToUpload(bytesLength); outputStreamStatistics.bytesToUpload(bytesLength);
}
buffer = byteBufferPool.getBuffer(false, bufferSize).array(); buffer = byteBufferPool.getBuffer(false, bufferSize).array();
bufferIndex = 0; bufferIndex = 0;
final long offset = position; final long offset = position;
@ -370,7 +381,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
bytesLength, APPEND_MODE, true); bytesLength, APPEND_MODE, true);
AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get()); AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
cachedSasToken.update(op.getSasToken()); cachedSasToken.update(op.getSasToken());
if (outputStreamStatistics != null) {
outputStreamStatistics.uploadSuccessful(bytesLength); outputStreamStatistics.uploadSuccessful(bytesLength);
}
perfInfo.registerResult(op.getResult()); perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true); perfInfo.registerSuccess(true);
@ -402,26 +415,34 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
if (bufferIndex == 0) { if (bufferIndex == 0) {
return; return;
} }
outputStreamStatistics.writeCurrentBuffer();
numOfAppendsToServerSinceLastFlush++; numOfAppendsToServerSinceLastFlush++;
final byte[] bytes = buffer; final byte[] bytes = buffer;
final int bytesLength = bufferIndex; final int bytesLength = bufferIndex;
if (outputStreamStatistics != null) {
outputStreamStatistics.writeCurrentBuffer();
outputStreamStatistics.bytesToUpload(bytesLength); outputStreamStatistics.bytesToUpload(bytesLength);
}
buffer = byteBufferPool.getBuffer(false, bufferSize).array(); buffer = byteBufferPool.getBuffer(false, bufferSize).array();
bufferIndex = 0; bufferIndex = 0;
final long offset = position; final long offset = position;
position += bytesLength; position += bytesLength;
if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) { if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
long start = System.currentTimeMillis(); //Tracking time spent on waiting for task to complete.
if (outputStreamStatistics != null) {
try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) {
waitForTaskToComplete(); waitForTaskToComplete();
outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis());
} }
} else {
final Future<Void> job = completionService.submit(new Callable<Void>() { waitForTaskToComplete();
@Override }
public Void call() throws Exception { }
final Future<Void> job =
completionService.submit(IOStatisticsBinding
.trackDurationOfCallable((IOStatisticsStore) ioStatistics,
StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
() -> {
AbfsPerfTracker tracker = client.getAbfsPerfTracker(); AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) { "writeCurrentBufferToService", "append")) {
@ -432,26 +453,26 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
} else if (isFlush) { } else if (isFlush) {
mode = FLUSH_MODE; mode = FLUSH_MODE;
} }
AppendRequestParameters reqParams = new AppendRequestParameters( AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false); offset, 0, bytesLength, mode, false);
AbfsRestOperation op = client.append(path, bytes, reqParams, AbfsRestOperation op = client.append(path, bytes, reqParams,
cachedSasToken.get()); cachedSasToken.get());
cachedSasToken.update(op.getSasToken()); cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult()); perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true); perfInfo.registerSuccess(true);
return null; return null;
} }
} })
}); );
if (outputStreamStatistics != null) {
if (job.isCancelled()) { if (job.isCancelled()) {
outputStreamStatistics.uploadFailed(bytesLength); outputStreamStatistics.uploadFailed(bytesLength);
} else { } else {
outputStreamStatistics.uploadSuccessful(bytesLength); outputStreamStatistics.uploadSuccessful(bytesLength);
} }
}
writeOperations.add(new WriteOperation(job, offset, bytesLength)); writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue // Try to shrink the queue
@ -527,8 +548,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
lastTotalAppendOffset += writeOperations.peek().length; lastTotalAppendOffset += writeOperations.peek().length;
writeOperations.remove(); writeOperations.remove();
// Incrementing statistics to indicate queue has been shrunk. // Incrementing statistics to indicate queue has been shrunk.
if (outputStreamStatistics != null) {
outputStreamStatistics.queueShrunk(); outputStreamStatistics.queueShrunk();
} }
}
} catch (Exception e) { } catch (Exception e) {
if (e.getCause() instanceof AzureBlobFileSystemException) { if (e.getCause() instanceof AzureBlobFileSystemException) {
lastError = (AzureBlobFileSystemException) e.getCause(); lastError = (AzureBlobFileSystemException) e.getCause();
@ -615,6 +638,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
return isAppendBlob; return isAppendBlob;
} }
@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
}
/** /**
* Appending AbfsOutputStream statistics to base toString(). * Appending AbfsOutputStream statistics to base toString().
* *
@ -623,9 +651,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder(super.toString()); final StringBuilder sb = new StringBuilder(super.toString());
sb.append("AbfsOuputStream@").append(this.hashCode()).append("){"); if (outputStreamStatistics != null) {
sb.append("AbfsOutputStream@").append(this.hashCode());
sb.append("){");
sb.append(outputStreamStatistics.toString()); sb.append(outputStreamStatistics.toString());
sb.append("}"); sb.append("}");
}
return sb.toString(); return sb.toString();
} }
} }

View File

@ -19,12 +19,15 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
/** /**
* Interface for {@link AbfsOutputStream} statistics. * Interface for {@link AbfsOutputStream} statistics.
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
public interface AbfsOutputStreamStatistics { public interface AbfsOutputStreamStatistics extends IOStatisticsSource {
/** /**
* Number of bytes to be uploaded. * Number of bytes to be uploaded.
@ -49,11 +52,9 @@ public interface AbfsOutputStreamStatistics {
/** /**
* Time spent in waiting for tasks to be completed in the blocking queue. * Time spent in waiting for tasks to be completed in the blocking queue.
* * @return instance of the DurationTracker that tracks the time for waiting.
* @param start millisecond at which the wait for task to be complete begins.
* @param end millisecond at which the wait is completed for the task.
*/ */
void timeSpentTaskWait(long start, long end); DurationTracker timeSpentTaskWait();
/** /**
* Number of times task queue is shrunk. * Number of times task queue is shrunk.
@ -65,6 +66,12 @@ public interface AbfsOutputStreamStatistics {
*/ */
void writeCurrentBuffer(); void writeCurrentBuffer();
/**
* Get the IOStatisticsStore instance from AbfsOutputStreamStatistics.
* @return instance of IOStatisticsStore which extends IOStatistics.
*/
IOStatistics getIOStatistics();
/** /**
* Method to form a string of all AbfsOutputStream statistics and their * Method to form a string of all AbfsOutputStream statistics and their
* values. * values.

View File

@ -18,32 +18,47 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
/** /**
* OutputStream statistics implementation for Abfs. * OutputStream statistics implementation for Abfs.
*/ */
public class AbfsOutputStreamStatisticsImpl public class AbfsOutputStreamStatisticsImpl
implements AbfsOutputStreamStatistics { implements AbfsOutputStreamStatistics {
private long bytesToUpload;
private long bytesUploadSuccessful; private final IOStatisticsStore ioStatisticsStore = iostatisticsStore()
private long bytesUploadFailed; .withCounters(
/** StreamStatisticNames.BYTES_TO_UPLOAD,
* Counter to get the total time spent while waiting for tasks to complete StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL,
* in the blocking queue inside the thread executor. StreamStatisticNames.BYTES_UPLOAD_FAILED,
StreamStatisticNames.QUEUE_SHRUNK_OPS,
StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS
)
.withDurationTracking(
StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT
)
.build();
/* Reference to the atomic counter for frequently updated counters to avoid
* cost of the map lookup on every increment.
*/ */
private long timeSpentOnTaskWait; private final AtomicLong bytesUpload =
/** ioStatisticsStore.getCounterReference(StreamStatisticNames.BYTES_TO_UPLOAD);
* Counter to get the total number of queue shrink operations done {@code private final AtomicLong bytesUploadedSuccessfully =
* AbfsOutputStream#shrinkWriteOperationQueue()} by AbfsOutputStream to ioStatisticsStore.getCounterReference(StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL);
* remove the write operations which were successfully done by private final AtomicLong writeCurrentBufferOps =
* AbfsOutputStream from the task queue. ioStatisticsStore.getCounterReference(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS);
*/
private long queueShrunkOps;
/**
* Counter to get the total number of times the current buffer is written
* to the service {@code AbfsOutputStream#writeCurrentBufferToService()} via
* AbfsClient and appended to the data store by AbfsRestOperation.
*/
private long writeCurrentBufferOperations;
/** /**
* Records the need to upload bytes and increments the total bytes that * Records the need to upload bytes and increments the total bytes that
@ -53,9 +68,7 @@ public class AbfsOutputStreamStatisticsImpl
*/ */
@Override @Override
public void bytesToUpload(long bytes) { public void bytesToUpload(long bytes) {
if (bytes > 0) { bytesUpload.addAndGet(bytes);
bytesToUpload += bytes;
}
} }
/** /**
@ -66,9 +79,7 @@ public class AbfsOutputStreamStatisticsImpl
*/ */
@Override @Override
public void uploadSuccessful(long bytes) { public void uploadSuccessful(long bytes) {
if (bytes > 0) { bytesUploadedSuccessfully.addAndGet(bytes);
bytesUploadSuccessful += bytes;
}
} }
/** /**
@ -78,9 +89,7 @@ public class AbfsOutputStreamStatisticsImpl
*/ */
@Override @Override
public void uploadFailed(long bytes) { public void uploadFailed(long bytes) {
if (bytes > 0) { ioStatisticsStore.incrementCounter(StreamStatisticNames.BYTES_UPLOAD_FAILED, bytes);
bytesUploadFailed += bytes;
}
} }
/** /**
@ -96,14 +105,10 @@ public class AbfsOutputStreamStatisticsImpl
* This time spent while waiting for the task to be completed is being * This time spent while waiting for the task to be completed is being
* recorded in this counter. * recorded in this counter.
* *
* @param startTime time(in milliseconds) before the wait for task to be
* completed is begin.
* @param endTime time(in milliseconds) after the wait for the task to be
* completed is done.
*/ */
@Override @Override
public void timeSpentTaskWait(long startTime, long endTime) { public DurationTracker timeSpentTaskWait() {
timeSpentOnTaskWait += endTime - startTime; return ioStatisticsStore.trackDuration(StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT);
} }
/** /**
@ -114,7 +119,7 @@ public class AbfsOutputStreamStatisticsImpl
*/ */
@Override @Override
public void queueShrunk() { public void queueShrunk() {
queueShrunkOps++; ioStatisticsStore.incrementCounter(StreamStatisticNames.QUEUE_SHRUNK_OPS);
} }
/** /**
@ -125,31 +130,59 @@ public class AbfsOutputStreamStatisticsImpl
*/ */
@Override @Override
public void writeCurrentBuffer() { public void writeCurrentBuffer() {
writeCurrentBufferOperations++; writeCurrentBufferOps.incrementAndGet();
} }
/**
* {@inheritDoc}
*
* A getter for IOStatisticsStore instance which extends IOStatistics.
*
* @return IOStatisticsStore instance.
*/
@Override
public IOStatistics getIOStatistics() {
return ioStatisticsStore;
}
@VisibleForTesting
public long getBytesToUpload() { public long getBytesToUpload() {
return bytesToUpload; return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_TO_UPLOAD);
} }
@VisibleForTesting
public long getBytesUploadSuccessful() { public long getBytesUploadSuccessful() {
return bytesUploadSuccessful; return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL);
} }
@VisibleForTesting
public long getBytesUploadFailed() { public long getBytesUploadFailed() {
return bytesUploadFailed; return ioStatisticsStore.counters().get(StreamStatisticNames.BYTES_UPLOAD_FAILED);
} }
@VisibleForTesting
public long getTimeSpentOnTaskWait() { public long getTimeSpentOnTaskWait() {
return timeSpentOnTaskWait; return ioStatisticsStore.counters().get(StreamStatisticNames.TIME_SPENT_ON_TASK_WAIT);
} }
@VisibleForTesting
public long getQueueShrunkOps() { public long getQueueShrunkOps() {
return queueShrunkOps; return ioStatisticsStore.counters().get(StreamStatisticNames.QUEUE_SHRUNK_OPS);
} }
@VisibleForTesting
public long getWriteCurrentBufferOperations() { public long getWriteCurrentBufferOperations() {
return writeCurrentBufferOperations; return ioStatisticsStore.counters().get(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS);
}
/**
* Getter for mean value of time taken to complete a PUT request by
* AbfsOutputStream.
* @return mean value.
*/
@VisibleForTesting
public double getTimeSpentOnPutRequest() {
return ioStatisticsStore.meanStatistics().get(StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST + StoreStatisticNames.SUFFIX_MEAN).mean();
} }
/** /**
@ -160,16 +193,7 @@ public class AbfsOutputStreamStatisticsImpl
@Override public String toString() { @Override public String toString() {
final StringBuilder outputStreamStats = new StringBuilder( final StringBuilder outputStreamStats = new StringBuilder(
"OutputStream Statistics{"); "OutputStream Statistics{");
outputStreamStats.append(", bytes_upload=").append(bytesToUpload); outputStreamStats.append(ioStatisticsStore.toString());
outputStreamStats.append(", bytes_upload_successfully=")
.append(bytesUploadSuccessful);
outputStreamStats.append(", bytes_upload_failed=")
.append(bytesUploadFailed);
outputStreamStats.append(", time_spent_task_wait=")
.append(timeSpentOnTaskWait);
outputStreamStats.append(", queue_shrunk_ops=").append(queueShrunkOps);
outputStreamStats.append(", write_current_buffer_ops=")
.append(writeCurrentBufferOperations);
outputStreamStats.append("}"); outputStreamStats.append("}");
return outputStreamStats.toString(); return outputStreamStats.toString();
} }

View File

@ -139,10 +139,9 @@ public class ITestAbfsInputStreamStatistics
* forwardSeekOps - Since we are doing a forward seek inside a loop * forwardSeekOps - Since we are doing a forward seek inside a loop
* for OPERATION times, total forward seeks would be OPERATIONS. * for OPERATION times, total forward seeks would be OPERATIONS.
* *
* bytesBackwardsOnSeek - Since we are doing backward seeks from end of * negativeBytesBackwardsOnSeek - Since we are doing backward seeks from
* file in a ONE_MB file each time, this would mean the bytes from * end of file in a ONE_MB file each time, this would mean the bytes from
* backward seek would be OPERATIONS * ONE_MB. Since this is backward * backward seek would be OPERATIONS * ONE_MB.
* seek this value is expected be to be negative.
* *
* bytesSkippedOnSeek - Since, we move from start to end in seek, but * bytesSkippedOnSeek - Since, we move from start to end in seek, but
* our fCursor(position of cursor) always remain at end of file, this * our fCursor(position of cursor) always remain at end of file, this
@ -160,7 +159,7 @@ public class ITestAbfsInputStreamStatistics
assertEquals("Mismatch in forwardSeekOps value", OPERATIONS, assertEquals("Mismatch in forwardSeekOps value", OPERATIONS,
stats.getForwardSeekOperations()); stats.getForwardSeekOperations());
assertEquals("Mismatch in bytesBackwardsOnSeek value", assertEquals("Mismatch in bytesBackwardsOnSeek value",
-1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek()); OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
assertEquals("Mismatch in bytesSkippedOnSeek value", assertEquals("Mismatch in bytesSkippedOnSeek value",
0, stats.getBytesSkippedOnSeek()); 0, stats.getBytesSkippedOnSeek());
assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS, assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS,
@ -366,6 +365,40 @@ public class ITestAbfsInputStreamStatistics
} }
} }
/**
* Testing time taken by AbfsInputStream to complete a GET request.
*/
@Test
public void testActionHttpGetRequest() throws IOException {
describe("Test to check the correct value of Time taken by http get "
+ "request in AbfsInputStream");
AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
Path actionHttpGetRequestPath = path(getMethodName());
AbfsInputStream abfsInputStream = null;
AbfsOutputStream abfsOutputStream = null;
try {
abfsOutputStream = createAbfsOutputStreamWithFlushEnabled(fs,
actionHttpGetRequestPath);
abfsOutputStream.write('a');
abfsOutputStream.hflush();
abfsInputStream =
abfss.openFileForRead(actionHttpGetRequestPath, fs.getFsStatistics());
abfsInputStream.read();
AbfsInputStreamStatisticsImpl abfsInputStreamStatistics =
(AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics();
LOG.info("AbfsInputStreamStats info: {}", abfsInputStreamStatistics.toString());
Assertions.assertThat(
abfsInputStreamStatistics.getActionHttpGetRequest())
.describedAs("Mismatch in time taken by a GET request")
.isGreaterThan(0.0);
} finally {
IOUtils.cleanupWithLogger(LOG, abfsInputStream, abfsOutputStream);
}
}
/** /**
* Method to assert the initial values of the statistics. * Method to assert the initial values of the statistics.
* *

View File

@ -20,7 +20,10 @@ package org.apache.hadoop.fs.azurebfs;
import java.io.IOException; import java.io.IOException;
import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
@ -31,7 +34,10 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
*/ */
public class ITestAbfsOutputStreamStatistics public class ITestAbfsOutputStreamStatistics
extends AbstractAbfsIntegrationTest { extends AbstractAbfsIntegrationTest {
private static final int OPERATIONS = 10; private static final int OPERATIONS = 10;
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsOutputStreamStatistics.class);
public ITestAbfsOutputStreamStatistics() throws Exception { public ITestAbfsOutputStreamStatistics() throws Exception {
} }
@ -219,6 +225,31 @@ public class ITestAbfsOutputStreamStatistics
} }
} }
/**
* Test to check correct value of time spent on a PUT request in
* AbfsOutputStream.
*/
@Test
public void testAbfsOutputStreamDurationTrackerPutRequest() throws IOException {
describe("Testing to check if DurationTracker for PUT request is working "
+ "correctly.");
AzureBlobFileSystem fs = getFileSystem();
Path pathForPutRequest = path(getMethodName());
try(AbfsOutputStream outputStream =
createAbfsOutputStreamWithFlushEnabled(fs, pathForPutRequest)) {
outputStream.write('a');
outputStream.hflush();
AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics =
getAbfsOutputStreamStatistics(outputStream);
LOG.info("AbfsOutputStreamStats info: {}", abfsOutputStreamStatistics.toString());
Assertions.assertThat(abfsOutputStreamStatistics.getTimeSpentOnPutRequest())
.describedAs("Mismatch in timeSpentOnPutRequest DurationTracker")
.isGreaterThan(0.0);
}
}
/** /**
* Method to get the AbfsOutputStream statistics. * Method to get the AbfsOutputStream statistics.
* *

View File

@ -94,17 +94,11 @@ public class TestAbfsOutputStreamStatistics
assertEquals("Mismatch in time spent on waiting for tasks to complete", 0, assertEquals("Mismatch in time spent on waiting for tasks to complete", 0,
abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
int smallRandomStartTime =
new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE);
int smallRandomEndTime =
new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE)
+ smallRandomStartTime;
int smallDiff = smallRandomEndTime - smallRandomStartTime;
abfsOutputStreamStatistics abfsOutputStreamStatistics
.timeSpentTaskWait(smallRandomStartTime, smallRandomEndTime); .timeSpentTaskWait();
//Test for small random value of timeSpentWaitTask. //Test for one op call value of timeSpentWaitTask.
assertEquals("Mismatch in time spent on waiting for tasks to complete", assertEquals("Mismatch in time spent on waiting for tasks to complete",
smallDiff, abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); 1, abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
//Reset statistics for the next test. //Reset statistics for the next test.
abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl();
@ -113,23 +107,16 @@ public class TestAbfsOutputStreamStatistics
* Entering multiple values for timeSpentTaskWait() to check the * Entering multiple values for timeSpentTaskWait() to check the
* summation is happening correctly. Also calculating the expected result. * summation is happening correctly. Also calculating the expected result.
*/ */
int expectedRandomDiff = 0;
for (int i = 0; i < OPERATIONS; i++) { for (int i = 0; i < OPERATIONS; i++) {
int largeRandomStartTime = abfsOutputStreamStatistics.timeSpentTaskWait();
new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE);
int largeRandomEndTime = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE)
+ largeRandomStartTime;
abfsOutputStreamStatistics
.timeSpentTaskWait(largeRandomStartTime, largeRandomEndTime);
expectedRandomDiff += largeRandomEndTime - largeRandomStartTime;
} }
/* /*
* Test to check correct value of timeSpentTaskWait after multiple * Test to check correct value of timeSpentTaskWait after OPERATIONS
* random values are passed in it. * number of op calls.
*/ */
assertEquals("Mismatch in time spent on waiting for tasks to complete", assertEquals("Mismatch in time spent on waiting for tasks to complete",
expectedRandomDiff, OPERATIONS,
abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); abfsOutputStreamStatistics.getTimeSpentOnTaskWait());
} }