HADOOP-15323. AliyunOSS: Improve copy file performance for AliyunOSSFileSystemStore. Contributed wujinhu.
This commit is contained in:
parent
f660e5eaa3
commit
040a202b20
|
@ -32,13 +32,16 @@ public class AliyunOSSCopyFileTask implements Runnable {
|
||||||
|
|
||||||
private AliyunOSSFileSystemStore store;
|
private AliyunOSSFileSystemStore store;
|
||||||
private String srcKey;
|
private String srcKey;
|
||||||
|
private long srcLen;
|
||||||
private String dstKey;
|
private String dstKey;
|
||||||
private AliyunOSSCopyFileContext copyFileContext;
|
private AliyunOSSCopyFileContext copyFileContext;
|
||||||
|
|
||||||
public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store,
|
public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store,
|
||||||
String srcKey, String dstKey, AliyunOSSCopyFileContext copyFileContext) {
|
String srcKey, long srcLen,
|
||||||
|
String dstKey, AliyunOSSCopyFileContext copyFileContext) {
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.srcKey = srcKey;
|
this.srcKey = srcKey;
|
||||||
|
this.srcLen = srcLen;
|
||||||
this.dstKey = dstKey;
|
this.dstKey = dstKey;
|
||||||
this.copyFileContext = copyFileContext;
|
this.copyFileContext = copyFileContext;
|
||||||
}
|
}
|
||||||
|
@ -47,7 +50,7 @@ public class AliyunOSSCopyFileTask implements Runnable {
|
||||||
public void run() {
|
public void run() {
|
||||||
boolean fail = false;
|
boolean fail = false;
|
||||||
try {
|
try {
|
||||||
store.copyFile(srcKey, dstKey);
|
store.copyFile(srcKey, srcLen, dstKey);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Exception thrown when copy from "
|
LOG.warn("Exception thrown when copy from "
|
||||||
+ srcKey + " to " + dstKey + ", exception: " + e);
|
+ srcKey + " to " + dstKey + ", exception: " + e);
|
||||||
|
|
|
@ -653,7 +653,7 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
if (srcStatus.isDirectory()) {
|
if (srcStatus.isDirectory()) {
|
||||||
copyDirectory(srcPath, dstPath);
|
copyDirectory(srcPath, dstPath);
|
||||||
} else {
|
} else {
|
||||||
copyFile(srcPath, dstPath);
|
copyFile(srcPath, srcStatus.getLen(), dstPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
return srcPath.equals(dstPath) || delete(srcPath, true);
|
return srcPath.equals(dstPath) || delete(srcPath, true);
|
||||||
|
@ -664,13 +664,14 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
* (the caller should make sure srcPath is a file and dstPath is valid)
|
* (the caller should make sure srcPath is a file and dstPath is valid)
|
||||||
*
|
*
|
||||||
* @param srcPath source path.
|
* @param srcPath source path.
|
||||||
|
* @param srcLen source path length if it is a file.
|
||||||
* @param dstPath destination path.
|
* @param dstPath destination path.
|
||||||
* @return true if file is successfully copied.
|
* @return true if file is successfully copied.
|
||||||
*/
|
*/
|
||||||
private boolean copyFile(Path srcPath, Path dstPath) {
|
private boolean copyFile(Path srcPath, long srcLen, Path dstPath) {
|
||||||
String srcKey = pathToKey(srcPath);
|
String srcKey = pathToKey(srcPath);
|
||||||
String dstKey = pathToKey(dstPath);
|
String dstKey = pathToKey(dstPath);
|
||||||
return store.copyFile(srcKey, dstKey);
|
return store.copyFile(srcKey, srcLen, dstKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -709,7 +710,8 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
|
|
||||||
//copy operation just copies metadata, oss will support shallow copy
|
//copy operation just copies metadata, oss will support shallow copy
|
||||||
executorService.execute(new AliyunOSSCopyFileTask(
|
executorService.execute(new AliyunOSSCopyFileTask(
|
||||||
store, objectSummary.getKey(), newKey, copyFileContext));
|
store, objectSummary.getKey(),
|
||||||
|
objectSummary.getSize(), newKey, copyFileContext));
|
||||||
copiesToFinish++;
|
copiesToFinish++;
|
||||||
// No need to call lock() here.
|
// No need to call lock() here.
|
||||||
// It's ok to copy one more file if the rename operation failed
|
// It's ok to copy one more file if the rename operation failed
|
||||||
|
|
|
@ -87,7 +87,6 @@ public class AliyunOSSFileSystemStore {
|
||||||
private OSSClient ossClient;
|
private OSSClient ossClient;
|
||||||
private String bucketName;
|
private String bucketName;
|
||||||
private long uploadPartSize;
|
private long uploadPartSize;
|
||||||
private long multipartThreshold;
|
|
||||||
private int maxKeys;
|
private int maxKeys;
|
||||||
private String serverSideEncryptionAlgorithm;
|
private String serverSideEncryptionAlgorithm;
|
||||||
|
|
||||||
|
@ -155,21 +154,10 @@ public class AliyunOSSFileSystemStore {
|
||||||
ossClient = new OSSClient(endPoint, provider, clientConf);
|
ossClient = new OSSClient(endPoint, provider, clientConf);
|
||||||
uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
|
uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
|
||||||
MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
|
MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
|
||||||
multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
|
|
||||||
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
|
|
||||||
serverSideEncryptionAlgorithm =
|
serverSideEncryptionAlgorithm =
|
||||||
conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
|
conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
|
||||||
|
|
||||||
if (multipartThreshold < 5 * 1024 * 1024) {
|
|
||||||
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
|
|
||||||
multipartThreshold = 5 * 1024 * 1024;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (multipartThreshold > 1024 * 1024 * 1024) {
|
|
||||||
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
|
|
||||||
multipartThreshold = 1024 * 1024 * 1024;
|
|
||||||
}
|
|
||||||
|
|
||||||
bucketName = uri.getHost();
|
bucketName = uri.getHost();
|
||||||
|
|
||||||
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
|
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
|
||||||
|
@ -305,18 +293,19 @@ public class AliyunOSSFileSystemStore {
|
||||||
* Copy an object from source key to destination key.
|
* Copy an object from source key to destination key.
|
||||||
*
|
*
|
||||||
* @param srcKey source key.
|
* @param srcKey source key.
|
||||||
|
* @param srcLen source file length.
|
||||||
* @param dstKey destination key.
|
* @param dstKey destination key.
|
||||||
* @return true if file is successfully copied.
|
* @return true if file is successfully copied.
|
||||||
*/
|
*/
|
||||||
public boolean copyFile(String srcKey, String dstKey) {
|
public boolean copyFile(String srcKey, long srcLen, String dstKey) {
|
||||||
ObjectMetadata objectMeta =
|
try {
|
||||||
ossClient.getObjectMetadata(bucketName, srcKey);
|
//1, try single copy first
|
||||||
statistics.incrementReadOps(1);
|
|
||||||
long contentLength = objectMeta.getContentLength();
|
|
||||||
if (contentLength <= multipartThreshold) {
|
|
||||||
return singleCopy(srcKey, dstKey);
|
return singleCopy(srcKey, dstKey);
|
||||||
} else {
|
} catch (Exception e) {
|
||||||
return multipartCopy(srcKey, contentLength, dstKey);
|
//2, if failed(shallow copy not supported), then multi part copy
|
||||||
|
LOG.debug("Exception thrown when copy file: " + srcKey
|
||||||
|
+ ", exception: " + e + ", use multipartCopy instead");
|
||||||
|
return multipartCopy(srcKey, srcLen, dstKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -282,7 +282,9 @@ please raise your issues with them.
|
||||||
<property>
|
<property>
|
||||||
<name>fs.oss.multipart.upload.threshold</name>
|
<name>fs.oss.multipart.upload.threshold</name>
|
||||||
<value>20971520</value>
|
<value>20971520</value>
|
||||||
<description>Minimum size in bytes before we start a multipart uploads or copy.</description>
|
<description>Minimum size in bytes before we start a multipart uploads or copy.
|
||||||
|
Notice: This property is deprecated and will be removed in further version.
|
||||||
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
|
|
@ -49,7 +49,6 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
|
|
||||||
conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
|
conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
|
||||||
conf.setInt(IO_CHUNK_BUFFER_SIZE,
|
conf.setInt(IO_CHUNK_BUFFER_SIZE,
|
||||||
conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
|
conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
|
||||||
|
|
|
@ -178,13 +178,13 @@ public class TestAliyunOSSFileSystemContract
|
||||||
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
|
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
|
||||||
store.storeEmptyFile("test/new/file/");
|
store.storeEmptyFile("test/new/file/");
|
||||||
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
|
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
|
||||||
store, srcOne.toUri().getPath().substring(1),
|
store, srcOne.toUri().getPath().substring(1), data.length,
|
||||||
dstOne.toUri().getPath().substring(1), copyFileContext);
|
dstOne.toUri().getPath().substring(1), copyFileContext);
|
||||||
oneCopyFileTask.run();
|
oneCopyFileTask.run();
|
||||||
assumeFalse(copyFileContext.isCopyFailure());
|
assumeFalse(copyFileContext.isCopyFailure());
|
||||||
|
|
||||||
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
|
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
|
||||||
store, srcOne.toUri().getPath().substring(1),
|
store, srcOne.toUri().getPath().substring(1), data.length,
|
||||||
dstTwo.toUri().getPath().substring(1), copyFileContext);
|
dstTwo.toUri().getPath().substring(1), copyFileContext);
|
||||||
twoCopyFileTask.run();
|
twoCopyFileTask.run();
|
||||||
assumeFalse(copyFileContext.isCopyFailure());
|
assumeFalse(copyFileContext.isCopyFailure());
|
||||||
|
@ -212,13 +212,13 @@ public class TestAliyunOSSFileSystemContract
|
||||||
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
|
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
|
||||||
//store.storeEmptyFile("test/new/file/");
|
//store.storeEmptyFile("test/new/file/");
|
||||||
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
|
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
|
||||||
store, srcOne.toUri().getPath().substring(1),
|
store, srcOne.toUri().getPath().substring(1), data.length,
|
||||||
dstOne.toUri().getPath().substring(1), copyFileContext);
|
dstOne.toUri().getPath().substring(1), copyFileContext);
|
||||||
oneCopyFileTask.run();
|
oneCopyFileTask.run();
|
||||||
assumeTrue(copyFileContext.isCopyFailure());
|
assumeTrue(copyFileContext.isCopyFailure());
|
||||||
|
|
||||||
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
|
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
|
||||||
store, srcOne.toUri().getPath().substring(1),
|
store, srcOne.toUri().getPath().substring(1), data.length,
|
||||||
dstTwo.toUri().getPath().substring(1), copyFileContext);
|
dstTwo.toUri().getPath().substring(1), copyFileContext);
|
||||||
twoCopyFileTask.run();
|
twoCopyFileTask.run();
|
||||||
assumeTrue(copyFileContext.isCopyFailure());
|
assumeTrue(copyFileContext.isCopyFailure());
|
||||||
|
@ -247,19 +247,19 @@ public class TestAliyunOSSFileSystemContract
|
||||||
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
|
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
|
||||||
//store.storeEmptyFile("test/new/file/");
|
//store.storeEmptyFile("test/new/file/");
|
||||||
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
|
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
|
||||||
store, srcOne.toUri().getPath().substring(1),
|
store, srcOne.toUri().getPath().substring(1), data.length,
|
||||||
dstOne.toUri().getPath().substring(1), copyFileContext);
|
dstOne.toUri().getPath().substring(1), copyFileContext);
|
||||||
oneCopyFileTask.run();
|
oneCopyFileTask.run();
|
||||||
assumeTrue(copyFileContext.isCopyFailure());
|
assumeTrue(copyFileContext.isCopyFailure());
|
||||||
|
|
||||||
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
|
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
|
||||||
store, srcOne.toUri().getPath().substring(1),
|
store, srcOne.toUri().getPath().substring(1), data.length,
|
||||||
dstTwo.toUri().getPath().substring(1), copyFileContext);
|
dstTwo.toUri().getPath().substring(1), copyFileContext);
|
||||||
twoCopyFileTask.run();
|
twoCopyFileTask.run();
|
||||||
assumeTrue(copyFileContext.isCopyFailure());
|
assumeTrue(copyFileContext.isCopyFailure());
|
||||||
|
|
||||||
AliyunOSSCopyFileTask threeCopyFileTask = new AliyunOSSCopyFileTask(
|
AliyunOSSCopyFileTask threeCopyFileTask = new AliyunOSSCopyFileTask(
|
||||||
store, srcOne.toUri().getPath().substring(1),
|
store, srcOne.toUri().getPath().substring(1), data.length,
|
||||||
dstThree.toUri().getPath().substring(1), copyFileContext);
|
dstThree.toUri().getPath().substring(1), copyFileContext);
|
||||||
threeCopyFileTask.run();
|
threeCopyFileTask.run();
|
||||||
assumeTrue(copyFileContext.isCopyFailure());
|
assumeTrue(copyFileContext.isCopyFailure());
|
||||||
|
|
|
@ -78,8 +78,6 @@ public class TestAliyunOSSFileSystemStore {
|
||||||
|
|
||||||
protected void writeRenameReadCompare(Path path, long len)
|
protected void writeRenameReadCompare(Path path, long len)
|
||||||
throws IOException, NoSuchAlgorithmException {
|
throws IOException, NoSuchAlgorithmException {
|
||||||
// If len > fs.oss.multipart.upload.threshold,
|
|
||||||
// we'll use a multipart upload copy
|
|
||||||
MessageDigest digest = MessageDigest.getInstance("MD5");
|
MessageDigest digest = MessageDigest.getInstance("MD5");
|
||||||
OutputStream out = new BufferedOutputStream(
|
OutputStream out = new BufferedOutputStream(
|
||||||
new DigestOutputStream(fs.create(path, false), digest));
|
new DigestOutputStream(fs.create(path, false), digest));
|
||||||
|
@ -92,10 +90,12 @@ public class TestAliyunOSSFileSystemStore {
|
||||||
assertTrue("Exists", fs.exists(path));
|
assertTrue("Exists", fs.exists(path));
|
||||||
|
|
||||||
Path copyPath = path.suffix(".copy");
|
Path copyPath = path.suffix(".copy");
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
fs.rename(path, copyPath);
|
fs.rename(path, copyPath);
|
||||||
|
|
||||||
assertTrue("Copy exists", fs.exists(copyPath));
|
assertTrue("Copy exists", fs.exists(copyPath));
|
||||||
|
// should less than 1 second
|
||||||
|
assertTrue(System.currentTimeMillis() - start < 1000);
|
||||||
// Download file from Aliyun OSS and compare the digest against the original
|
// Download file from Aliyun OSS and compare the digest against the original
|
||||||
MessageDigest digest2 = MessageDigest.getInstance("MD5");
|
MessageDigest digest2 = MessageDigest.getInstance("MD5");
|
||||||
InputStream in = new BufferedInputStream(
|
InputStream in = new BufferedInputStream(
|
||||||
|
@ -119,7 +119,7 @@ public class TestAliyunOSSFileSystemStore {
|
||||||
@Test
|
@Test
|
||||||
public void testLargeUpload()
|
public void testLargeUpload()
|
||||||
throws IOException, NoSuchAlgorithmException {
|
throws IOException, NoSuchAlgorithmException {
|
||||||
// Multipart upload, multipart copy
|
// Multipart upload, shallow copy
|
||||||
writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte
|
writeRenameReadCompare(new Path("/test/xlarge"), 2147483648L); // 2GB
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,6 @@ public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest {
|
||||||
@Override
|
@Override
|
||||||
protected Configuration createConfiguration() {
|
protected Configuration createConfiguration() {
|
||||||
Configuration newConf = super.createConfiguration();
|
Configuration newConf = super.createConfiguration();
|
||||||
newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
|
|
||||||
newConf.setLong(MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_SETTING);
|
newConf.setLong(MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_SETTING);
|
||||||
return newConf;
|
return newConf;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue