HADOOP-13735 ITestS3AFileContextStatistics.testStatistics() failing. Contributed by Pieter Reuse

This commit is contained in:
Steve Loughran 2016-10-20 14:50:30 +01:00
parent 73504b1bdc
commit 9ae270af02
2 changed files with 11 additions and 4 deletions

View File

@ -548,7 +548,7 @@ public class S3AFileSystem extends FileSystem {
progress, progress,
partSize, partSize,
blockFactory, blockFactory,
instrumentation.newOutputStreamStatistics(), instrumentation.newOutputStreamStatistics(statistics),
new WriteOperationHelper(key) new WriteOperationHelper(key)
), ),
null); null);

View File

@ -37,6 +37,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem.Statistics;
import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.hadoop.fs.s3a.Statistic.*;
@ -639,9 +640,8 @@ public class S3AInstrumentation {
* Create a stream output statistics instance. * Create a stream output statistics instance.
* @return the new instance * @return the new instance
*/ */
OutputStreamStatistics newOutputStreamStatistics(Statistics statistics) {
OutputStreamStatistics newOutputStreamStatistics() { return new OutputStreamStatistics(statistics);
return new OutputStreamStatistics();
} }
/** /**
@ -677,6 +677,12 @@ public class S3AInstrumentation {
private final AtomicLong queueDuration = new AtomicLong(0); private final AtomicLong queueDuration = new AtomicLong(0);
private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0); private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0);
private Statistics statistics;
public OutputStreamStatistics(Statistics statistics){
this.statistics = statistics;
}
/** /**
* Block is queued for upload. * Block is queued for upload.
*/ */
@ -717,6 +723,7 @@ public class S3AInstrumentation {
/** Intermediate report of bytes uploaded. */ /** Intermediate report of bytes uploaded. */
void bytesTransferred(long byteCount) { void bytesTransferred(long byteCount) {
bytesUploaded.addAndGet(byteCount); bytesUploaded.addAndGet(byteCount);
statistics.incrementBytesWritten(byteCount);
bytesPendingUpload.addAndGet(-byteCount); bytesPendingUpload.addAndGet(-byteCount);
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount); incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount);
} }