diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
index 4fbb6fb8b1f..9c4435c11f3 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
@@ -405,7 +405,6 @@ public class AliyunOSSFileSystem extends FileSystem {
ObjectListing objects = store.listObjects(key, maxKeys, null, false);
while (true) {
- statistics.incrementReadOps(1);
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String objKey = objectSummary.getKey();
if (objKey.equals(key + "/")) {
@@ -446,7 +445,6 @@ public class AliyunOSSFileSystem extends FileSystem {
}
String nextMarker = objects.getNextMarker();
objects = store.listObjects(key, maxKeys, nextMarker, false);
- statistics.incrementReadOps(1);
} else {
break;
}
@@ -694,7 +692,6 @@ public class AliyunOSSFileSystem extends FileSystem {
new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
maxConcurrentCopyTasksPerDir, true));
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
- statistics.incrementReadOps(1);
// Copy files from src folder to dst
int copiesToFinish = 0;
while (true) {
@@ -717,7 +714,6 @@ public class AliyunOSSFileSystem extends FileSystem {
if (objects.isTruncated()) {
String nextMarker = objects.getNextMarker();
objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
- statistics.incrementReadOps(1);
} else {
break;
}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
index 7639eb398ca..4fc1325278a 100644
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
@@ -175,6 +175,7 @@ public class AliyunOSSFileSystemStore {
CannedAccessControlList cannedACL =
CannedAccessControlList.valueOf(cannedACLName);
ossClient.setBucketAcl(bucketName, cannedACL);
+ statistics.incrementWriteOps(1);
}
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
@@ -216,6 +217,7 @@ public class AliyunOSSFileSystemStore {
// Here, we choose the simple mode to do batch delete.
deleteRequest.setQuiet(true);
DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest);
+ statistics.incrementWriteOps(1);
deleteFailed = result.getDeletedObjects();
tries++;
if (tries == retry) {
@@ -268,11 +270,13 @@ public class AliyunOSSFileSystemStore {
*/
public ObjectMetadata getObjectMetadata(String key) {
try {
- return ossClient.getObjectMetadata(bucketName, key);
- } catch (OSSException osse) {
- return null;
- } finally {
+ ObjectMetadata objectMeta = ossClient.getObjectMetadata(bucketName, key);
statistics.incrementReadOps(1);
+ return objectMeta;
+ } catch (OSSException osse) {
+ LOG.error("Exception thrown when get object meta: "
+ + key + ", exception: " + osse);
+ return null;
}
}
@@ -289,6 +293,7 @@ public class AliyunOSSFileSystemStore {
dirMeta.setContentLength(0);
try {
ossClient.putObject(bucketName, key, in, dirMeta);
+ statistics.incrementWriteOps(1);
} finally {
in.close();
}
@@ -304,6 +309,7 @@ public class AliyunOSSFileSystemStore {
public boolean copyFile(String srcKey, String dstKey) {
ObjectMetadata objectMeta =
ossClient.getObjectMetadata(bucketName, srcKey);
+ statistics.incrementReadOps(1);
long contentLength = objectMeta.getContentLength();
if (contentLength <= multipartThreshold) {
return singleCopy(srcKey, dstKey);
@@ -323,6 +329,7 @@ public class AliyunOSSFileSystemStore {
private boolean singleCopy(String srcKey, String dstKey) {
CopyObjectResult copyResult =
ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
+ statistics.incrementWriteOps(1);
LOG.debug(copyResult.getETag());
return true;
}
@@ -372,6 +379,7 @@ public class AliyunOSSFileSystemStore {
UploadPartCopyResult partCopyResult =
ossClient.uploadPartCopy(partCopyRequest);
statistics.incrementWriteOps(1);
+ statistics.incrementBytesWritten(size);
partETags.add(partCopyResult.getPartETag());
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
@@ -408,6 +416,7 @@ public class AliyunOSSFileSystemStore {
PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
LOG.debug(result.getETag());
statistics.incrementWriteOps(1);
+ statistics.incrementBytesWritten(file.length());
} finally {
fis.close();
}
@@ -449,7 +458,9 @@ public class AliyunOSSFileSystemStore {
try {
GetObjectRequest request = new GetObjectRequest(bucketName, key);
request.setRange(byteStart, byteEnd);
- return ossClient.getObject(request).getObjectContent();
+ InputStream in = ossClient.getObject(request).getObjectContent();
+ statistics.incrementReadOps(1);
+ return in;
} catch (OSSException | ClientException e) {
LOG.error("Exception thrown when store retrieves key: "
+ key + ", exception: " + e);
@@ -480,6 +491,7 @@ public class AliyunOSSFileSystemStore {
for (OSSObjectSummary object : objects.getObjectSummaries()) {
key = object.getKey();
ossClient.deleteObject(bucketName, key);
+ statistics.incrementWriteOps(1);
}
for (String dir: objects.getCommonPrefixes()) {
@@ -604,6 +616,8 @@ public class AliyunOSSFileSystemStore {
uploadRequest.setPartSize(file.length());
uploadRequest.setPartNumber(idx);
UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
+ statistics.incrementWriteOps(1);
+ statistics.incrementBytesWritten(file.length());
return uploadResult.getPartETag();
} catch (Exception e) {
LOG.debug("Failed to upload "+ file.getPath() +", " +
diff --git a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md
index 0c3131d49cf..87aa90bf89c 100644
--- a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md
+++ b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md
@@ -117,6 +117,11 @@ please raise your issues with them.
+
+ fs.oss.impl
+ org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
+
+
fs.oss.assumed.role.arn
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
index 6fe6f03107f..c3387a3d846 100644
--- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
+import static org.junit.Assert.assertEquals;
/**
* Tests regular and multi-part upload functionality for
@@ -74,24 +75,73 @@ public class TestAliyunOSSBlockOutputStream {
@Test
public void testRegularUpload() throws IOException {
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 - 1);
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 + 1);
+ long size = 1024 * 1024;
+ FileSystem.Statistics statistics =
+ FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
+ // This test is a little complicated for statistics, lifecycle is
+ // generateTestFile
+ // fs.create(getFileStatus) read 1
+ // output stream write write 1
+ // path exists(fs.exists) read 1
+ // verifyReceivedData
+ // fs.open(getFileStatus) read 1
+ // input stream read read 2(part size is 512K)
+ // fs.delete
+ // getFileStatus & delete & exists & create fake dir read 2, write 2
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
+ assertEquals(7, statistics.getReadOps());
+ assertEquals(size - 1, statistics.getBytesRead());
+ assertEquals(3, statistics.getWriteOps());
+ assertEquals(size - 1, statistics.getBytesWritten());
+
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
+ assertEquals(14, statistics.getReadOps());
+ assertEquals(2 * size - 1, statistics.getBytesRead());
+ assertEquals(6, statistics.getWriteOps());
+ assertEquals(2 * size - 1, statistics.getBytesWritten());
+
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1);
+
+ assertEquals(22, statistics.getReadOps());
+ assertEquals(3 * size, statistics.getBytesRead());
+ assertEquals(10, statistics.getWriteOps());
+ assertEquals(3 * size, statistics.getBytesWritten());
}
@Test
public void testMultiPartUpload() throws IOException {
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
- 6 * 1024 * 1024 - 1);
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
- 6 * 1024 * 1024 + 1);
+ long size = 6 * 1024 * 1024;
+ FileSystem.Statistics statistics =
+ FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
+ assertEquals(17, statistics.getReadOps());
+ assertEquals(size - 1, statistics.getBytesRead());
+ assertEquals(8, statistics.getWriteOps());
+ assertEquals(size - 1, statistics.getBytesWritten());
+
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
+ assertEquals(34, statistics.getReadOps());
+ assertEquals(2 * size - 1, statistics.getBytesRead());
+ assertEquals(16, statistics.getWriteOps());
+ assertEquals(2 * size - 1, statistics.getBytesWritten());
+
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1);
+ assertEquals(52, statistics.getReadOps());
+ assertEquals(3 * size, statistics.getBytesRead());
+ assertEquals(25, statistics.getWriteOps());
+ assertEquals(3 * size, statistics.getBytesWritten());
}
@Test
public void testMultiPartUploadConcurrent() throws IOException {
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
- 50 * 1024 * 1024 - 1);
+ long size = 50 * 1024 * 1024 - 1;
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
+ FileSystem.Statistics statistics =
+ FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
+ assertEquals(105, statistics.getReadOps());
+ assertEquals(size, statistics.getBytesRead());
+ assertEquals(52, statistics.getWriteOps());
+ assertEquals(size, statistics.getBytesWritten());
}
@Test