HADOOP-15323. AliyunOSS: Improve copy file performance for AliyunOSSFileSystemStore. Contributed wujinhu.

(cherry picked from commit 040a202b20)
This commit is contained in:
Weiwei Yang 2019-01-03 21:25:52 +08:00
parent f804d7d3de
commit 37e36617c5
8 changed files with 36 additions and 42 deletions

View File

@ -32,13 +32,16 @@ public class AliyunOSSCopyFileTask implements Runnable {
private AliyunOSSFileSystemStore store;
private String srcKey;
private long srcLen;
private String dstKey;
private AliyunOSSCopyFileContext copyFileContext;
public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store,
String srcKey, String dstKey, AliyunOSSCopyFileContext copyFileContext) {
String srcKey, long srcLen,
String dstKey, AliyunOSSCopyFileContext copyFileContext) {
this.store = store;
this.srcKey = srcKey;
this.srcLen = srcLen;
this.dstKey = dstKey;
this.copyFileContext = copyFileContext;
}
@ -47,7 +50,7 @@ public class AliyunOSSCopyFileTask implements Runnable {
public void run() {
boolean fail = false;
try {
store.copyFile(srcKey, dstKey);
store.copyFile(srcKey, srcLen, dstKey);
} catch (Exception e) {
LOG.warn("Exception thrown when copy from "
+ srcKey + " to " + dstKey + ", exception: " + e);

View File

@ -653,7 +653,7 @@ public class AliyunOSSFileSystem extends FileSystem {
if (srcStatus.isDirectory()) {
copyDirectory(srcPath, dstPath);
} else {
copyFile(srcPath, dstPath);
copyFile(srcPath, srcStatus.getLen(), dstPath);
}
return srcPath.equals(dstPath) || delete(srcPath, true);
@ -664,13 +664,14 @@ public class AliyunOSSFileSystem extends FileSystem {
* (the caller should make sure srcPath is a file and dstPath is valid)
*
* @param srcPath source path.
* @param srcLen source path length if it is a file.
* @param dstPath destination path.
* @return true if file is successfully copied.
*/
private boolean copyFile(Path srcPath, Path dstPath) {
private boolean copyFile(Path srcPath, long srcLen, Path dstPath) {
String srcKey = pathToKey(srcPath);
String dstKey = pathToKey(dstPath);
return store.copyFile(srcKey, dstKey);
return store.copyFile(srcKey, srcLen, dstKey);
}
/**
@ -709,7 +710,8 @@ public class AliyunOSSFileSystem extends FileSystem {
//copy operation just copies metadata, oss will support shallow copy
executorService.execute(new AliyunOSSCopyFileTask(
store, objectSummary.getKey(), newKey, copyFileContext));
store, objectSummary.getKey(),
objectSummary.getSize(), newKey, copyFileContext));
copiesToFinish++;
// No need to call lock() here.
// It's ok to copy one more file if the rename operation failed

View File

@ -87,7 +87,6 @@ public class AliyunOSSFileSystemStore {
private OSSClient ossClient;
private String bucketName;
private long uploadPartSize;
private long multipartThreshold;
private int maxKeys;
private String serverSideEncryptionAlgorithm;
@ -155,21 +154,10 @@ public class AliyunOSSFileSystemStore {
ossClient = new OSSClient(endPoint, provider, clientConf);
uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
serverSideEncryptionAlgorithm =
conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
if (multipartThreshold < 5 * 1024 * 1024) {
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
multipartThreshold = 5 * 1024 * 1024;
}
if (multipartThreshold > 1024 * 1024 * 1024) {
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
multipartThreshold = 1024 * 1024 * 1024;
}
bucketName = uri.getHost();
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
@ -305,18 +293,19 @@ public class AliyunOSSFileSystemStore {
* Copy an object from source key to destination key.
*
* @param srcKey source key.
* @param srcLen source file length.
* @param dstKey destination key.
* @return true if file is successfully copied.
*/
public boolean copyFile(String srcKey, String dstKey) {
ObjectMetadata objectMeta =
ossClient.getObjectMetadata(bucketName, srcKey);
statistics.incrementReadOps(1);
long contentLength = objectMeta.getContentLength();
if (contentLength <= multipartThreshold) {
public boolean copyFile(String srcKey, long srcLen, String dstKey) {
try {
//1, try single copy first
return singleCopy(srcKey, dstKey);
} else {
return multipartCopy(srcKey, contentLength, dstKey);
} catch (Exception e) {
//2, if failed(shallow copy not supported), then multi part copy
LOG.debug("Exception thrown when copy file: " + srcKey
+ ", exception: " + e + ", use multipartCopy instead");
return multipartCopy(srcKey, srcLen, dstKey);
}
}

View File

@ -282,7 +282,9 @@ please raise your issues with them.
<property>
<name>fs.oss.multipart.upload.threshold</name>
<value>20971520</value>
<description>Minimum size in bytes before we start a multipart uploads or copy.</description>
<description>Minimum size in bytes before we start a multipart uploads or copy.
Notice: This property is deprecated and will be removed in further version.
</description>
</property>
<property>

View File

@ -49,7 +49,6 @@ public class TestAliyunOSSBlockOutputStream {
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
conf.setInt(IO_CHUNK_BUFFER_SIZE,
conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));

View File

@ -178,13 +178,13 @@ public class TestAliyunOSSFileSystemContract
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
store.storeEmptyFile("test/new/file/");
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
store, srcOne.toUri().getPath().substring(1), data.length,
dstOne.toUri().getPath().substring(1), copyFileContext);
oneCopyFileTask.run();
assumeFalse(copyFileContext.isCopyFailure());
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
store, srcOne.toUri().getPath().substring(1), data.length,
dstTwo.toUri().getPath().substring(1), copyFileContext);
twoCopyFileTask.run();
assumeFalse(copyFileContext.isCopyFailure());
@ -212,13 +212,13 @@ public class TestAliyunOSSFileSystemContract
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
//store.storeEmptyFile("test/new/file/");
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
store, srcOne.toUri().getPath().substring(1), data.length,
dstOne.toUri().getPath().substring(1), copyFileContext);
oneCopyFileTask.run();
assumeTrue(copyFileContext.isCopyFailure());
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
store, srcOne.toUri().getPath().substring(1), data.length,
dstTwo.toUri().getPath().substring(1), copyFileContext);
twoCopyFileTask.run();
assumeTrue(copyFileContext.isCopyFailure());
@ -247,19 +247,19 @@ public class TestAliyunOSSFileSystemContract
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
//store.storeEmptyFile("test/new/file/");
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
store, srcOne.toUri().getPath().substring(1), data.length,
dstOne.toUri().getPath().substring(1), copyFileContext);
oneCopyFileTask.run();
assumeTrue(copyFileContext.isCopyFailure());
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
store, srcOne.toUri().getPath().substring(1), data.length,
dstTwo.toUri().getPath().substring(1), copyFileContext);
twoCopyFileTask.run();
assumeTrue(copyFileContext.isCopyFailure());
AliyunOSSCopyFileTask threeCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
store, srcOne.toUri().getPath().substring(1), data.length,
dstThree.toUri().getPath().substring(1), copyFileContext);
threeCopyFileTask.run();
assumeTrue(copyFileContext.isCopyFailure());

View File

@ -78,8 +78,6 @@ public class TestAliyunOSSFileSystemStore {
protected void writeRenameReadCompare(Path path, long len)
throws IOException, NoSuchAlgorithmException {
// If len > fs.oss.multipart.upload.threshold,
// we'll use a multipart upload copy
MessageDigest digest = MessageDigest.getInstance("MD5");
OutputStream out = new BufferedOutputStream(
new DigestOutputStream(fs.create(path, false), digest));
@ -92,10 +90,12 @@ public class TestAliyunOSSFileSystemStore {
assertTrue("Exists", fs.exists(path));
Path copyPath = path.suffix(".copy");
long start = System.currentTimeMillis();
fs.rename(path, copyPath);
assertTrue("Copy exists", fs.exists(copyPath));
// should less than 1 second
assertTrue(System.currentTimeMillis() - start < 1000);
// Download file from Aliyun OSS and compare the digest against the original
MessageDigest digest2 = MessageDigest.getInstance("MD5");
InputStream in = new BufferedInputStream(
@ -119,7 +119,7 @@ public class TestAliyunOSSFileSystemStore {
@Test
public void testLargeUpload()
throws IOException, NoSuchAlgorithmException {
// Multipart upload, multipart copy
writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte
// Multipart upload, shallow copy
writeRenameReadCompare(new Path("/test/xlarge"), 2147483648L); // 2GB
}
}

View File

@ -32,7 +32,6 @@ public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest {
@Override
protected Configuration createConfiguration() {
Configuration newConf = super.createConfiguration();
newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
newConf.setLong(MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_SETTING);
return newConf;
}