From 769babf47063c3f922c09f59ae9b9b94567c94f7 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 20 Oct 2016 14:50:30 +0100 Subject: [PATCH] HADOOP-13735 ITestS3AFileContextStatistics.testStatistics() failing. Contributed by Pieter Reuse --- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 2 +- .../apache/hadoop/fs/s3a/S3AInstrumentation.java | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 1532cde7fb1..0e899e3e705 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -547,7 +547,7 @@ public class S3AFileSystem extends FileSystem { progress, partSize, blockFactory, - instrumentation.newOutputStreamStatistics(), + instrumentation.newOutputStreamStatistics(statistics), new WriteOperationHelper(key) ), null); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 963c53facdf..fb8c85239d2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.fs.FileSystem.Statistics; import static org.apache.hadoop.fs.s3a.Statistic.*; @@ -639,9 +640,8 @@ public class S3AInstrumentation { * Create a stream output statistics instance. * @return the new instance */ - - OutputStreamStatistics newOutputStreamStatistics() { - return new OutputStreamStatistics(); + OutputStreamStatistics newOutputStreamStatistics(Statistics statistics) { + return new OutputStreamStatistics(statistics); } /** @@ -677,6 +677,12 @@ public class S3AInstrumentation { private final AtomicLong queueDuration = new AtomicLong(0); private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0); + private Statistics statistics; + + public OutputStreamStatistics(Statistics statistics){ + this.statistics = statistics; + } + /** * Block is queued for upload. */ @@ -717,6 +723,7 @@ public class S3AInstrumentation { /** Intermediate report of bytes uploaded. */ void bytesTransferred(long byteCount) { bytesUploaded.addAndGet(byteCount); + statistics.incrementBytesWritten(byteCount); bytesPendingUpload.addAndGet(-byteCount); incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount); }