HADOOP-15917. AliyunOSS: fix incorrect ReadOps and WriteOps in statistics. Contributed by Jinhu Wu.

(cherry picked from commit 3fade865ce)
(cherry picked from commit 64cb97fb44)
(cherry picked from commit 5d532cfc6f)
This commit is contained in:
Sammi Chen 2018-11-14 12:58:57 +08:00
parent 4039840510
commit 37082a664a
4 changed files with 84 additions and 19 deletions

View File

@ -405,7 +405,6 @@ public class AliyunOSSFileSystem extends FileSystem {
ObjectListing objects = store.listObjects(key, maxKeys, null, false);
while (true) {
statistics.incrementReadOps(1);
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String objKey = objectSummary.getKey();
if (objKey.equals(key + "/")) {
@ -446,7 +445,6 @@ public class AliyunOSSFileSystem extends FileSystem {
}
String nextMarker = objects.getNextMarker();
objects = store.listObjects(key, maxKeys, nextMarker, false);
statistics.incrementReadOps(1);
} else {
break;
}
@ -694,7 +692,6 @@ public class AliyunOSSFileSystem extends FileSystem {
new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
maxConcurrentCopyTasksPerDir, true));
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
statistics.incrementReadOps(1);
// Copy files from src folder to dst
int copiesToFinish = 0;
while (true) {
@ -717,7 +714,6 @@ public class AliyunOSSFileSystem extends FileSystem {
if (objects.isTruncated()) {
String nextMarker = objects.getNextMarker();
objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
statistics.incrementReadOps(1);
} else {
break;
}

View File

@ -175,6 +175,7 @@ public class AliyunOSSFileSystemStore {
CannedAccessControlList cannedACL =
CannedAccessControlList.valueOf(cannedACLName);
ossClient.setBucketAcl(bucketName, cannedACL);
statistics.incrementWriteOps(1);
}
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
@ -216,6 +217,7 @@ public class AliyunOSSFileSystemStore {
// Here, we choose the simple mode to do batch delete.
deleteRequest.setQuiet(true);
DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest);
statistics.incrementWriteOps(1);
deleteFailed = result.getDeletedObjects();
tries++;
if (tries == retry) {
@ -268,11 +270,13 @@ public class AliyunOSSFileSystemStore {
*/
public ObjectMetadata getObjectMetadata(String key) {
try {
return ossClient.getObjectMetadata(bucketName, key);
} catch (OSSException osse) {
return null;
} finally {
ObjectMetadata objectMeta = ossClient.getObjectMetadata(bucketName, key);
statistics.incrementReadOps(1);
return objectMeta;
} catch (OSSException osse) {
LOG.error("Exception thrown when get object meta: "
+ key + ", exception: " + osse);
return null;
}
}
@ -289,6 +293,7 @@ public class AliyunOSSFileSystemStore {
dirMeta.setContentLength(0);
try {
ossClient.putObject(bucketName, key, in, dirMeta);
statistics.incrementWriteOps(1);
} finally {
in.close();
}
@ -304,6 +309,7 @@ public class AliyunOSSFileSystemStore {
public boolean copyFile(String srcKey, String dstKey) {
ObjectMetadata objectMeta =
ossClient.getObjectMetadata(bucketName, srcKey);
statistics.incrementReadOps(1);
long contentLength = objectMeta.getContentLength();
if (contentLength <= multipartThreshold) {
return singleCopy(srcKey, dstKey);
@ -323,6 +329,7 @@ public class AliyunOSSFileSystemStore {
private boolean singleCopy(String srcKey, String dstKey) {
CopyObjectResult copyResult =
ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
statistics.incrementWriteOps(1);
LOG.debug(copyResult.getETag());
return true;
}
@ -372,6 +379,7 @@ public class AliyunOSSFileSystemStore {
UploadPartCopyResult partCopyResult =
ossClient.uploadPartCopy(partCopyRequest);
statistics.incrementWriteOps(1);
statistics.incrementBytesWritten(size);
partETags.add(partCopyResult.getPartETag());
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
@ -408,6 +416,7 @@ public class AliyunOSSFileSystemStore {
PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
LOG.debug(result.getETag());
statistics.incrementWriteOps(1);
statistics.incrementBytesWritten(file.length());
} finally {
fis.close();
}
@ -449,7 +458,9 @@ public class AliyunOSSFileSystemStore {
try {
GetObjectRequest request = new GetObjectRequest(bucketName, key);
request.setRange(byteStart, byteEnd);
return ossClient.getObject(request).getObjectContent();
InputStream in = ossClient.getObject(request).getObjectContent();
statistics.incrementReadOps(1);
return in;
} catch (OSSException | ClientException e) {
LOG.error("Exception thrown when store retrieves key: "
+ key + ", exception: " + e);
@ -480,6 +491,7 @@ public class AliyunOSSFileSystemStore {
for (OSSObjectSummary object : objects.getObjectSummaries()) {
key = object.getKey();
ossClient.deleteObject(bucketName, key);
statistics.incrementWriteOps(1);
}
for (String dir: objects.getCommonPrefixes()) {
@ -604,6 +616,8 @@ public class AliyunOSSFileSystemStore {
uploadRequest.setPartSize(file.length());
uploadRequest.setPartNumber(idx);
UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
statistics.incrementWriteOps(1);
statistics.incrementBytesWritten(file.length());
return uploadResult.getPartETag();
} catch (Exception e) {
LOG.debug("Failed to upload "+ file.getPath() +", " +

View File

@ -117,6 +117,11 @@ please raise your issues with them.
</description>
</property>
<property>
<name>fs.oss.impl</name>
<value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
</property>
<property>
<name>fs.oss.assumed.role.arn</name>
<description>

View File

@ -32,6 +32,7 @@ import java.io.IOException;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
import static org.junit.Assert.assertEquals;
/**
* Tests regular and multi-part upload functionality for
@ -74,24 +75,73 @@ public class TestAliyunOSSBlockOutputStream {
@Test
public void testRegularUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 - 1);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 + 1);
long size = 1024 * 1024;
FileSystem.Statistics statistics =
FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
// This test is a little complicated for statistics, lifecycle is
// generateTestFile
// fs.create(getFileStatus) read 1
// output stream write write 1
// path exists(fs.exists) read 1
// verifyReceivedData
// fs.open(getFileStatus) read 1
// input stream read read 2(part size is 512K)
// fs.delete
// getFileStatus & delete & exists & create fake dir read 2, write 2
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
assertEquals(7, statistics.getReadOps());
assertEquals(size - 1, statistics.getBytesRead());
assertEquals(3, statistics.getWriteOps());
assertEquals(size - 1, statistics.getBytesWritten());
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
assertEquals(14, statistics.getReadOps());
assertEquals(2 * size - 1, statistics.getBytesRead());
assertEquals(6, statistics.getWriteOps());
assertEquals(2 * size - 1, statistics.getBytesWritten());
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1);
assertEquals(22, statistics.getReadOps());
assertEquals(3 * size, statistics.getBytesRead());
assertEquals(10, statistics.getWriteOps());
assertEquals(3 * size, statistics.getBytesWritten());
}
@Test
public void testMultiPartUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
6 * 1024 * 1024 - 1);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
6 * 1024 * 1024 + 1);
long size = 6 * 1024 * 1024;
FileSystem.Statistics statistics =
FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
assertEquals(17, statistics.getReadOps());
assertEquals(size - 1, statistics.getBytesRead());
assertEquals(8, statistics.getWriteOps());
assertEquals(size - 1, statistics.getBytesWritten());
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
assertEquals(34, statistics.getReadOps());
assertEquals(2 * size - 1, statistics.getBytesRead());
assertEquals(16, statistics.getWriteOps());
assertEquals(2 * size - 1, statistics.getBytesWritten());
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1);
assertEquals(52, statistics.getReadOps());
assertEquals(3 * size, statistics.getBytesRead());
assertEquals(25, statistics.getWriteOps());
assertEquals(3 * size, statistics.getBytesWritten());
}
@Test
public void testMultiPartUploadConcurrent() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
50 * 1024 * 1024 - 1);
long size = 50 * 1024 * 1024 - 1;
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
FileSystem.Statistics statistics =
FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
assertEquals(105, statistics.getReadOps());
assertEquals(size, statistics.getBytesRead());
assertEquals(52, statistics.getWriteOps());
assertEquals(size, statistics.getBytesWritten());
}
@Test