HADOOP-16191. AliyunOSS: improvements for copyFile/copyDirectory and logging. Contributed by wujinhu.
This commit is contained in:
parent
09eabda314
commit
568d3ab8b6
|
@ -50,7 +50,7 @@ public class AliyunOSSCopyFileTask implements Runnable {
|
||||||
public void run() {
|
public void run() {
|
||||||
boolean fail = false;
|
boolean fail = false;
|
||||||
try {
|
try {
|
||||||
store.copyFile(srcKey, srcLen, dstKey);
|
fail = !store.copyFile(srcKey, srcLen, dstKey);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Exception thrown when copy from "
|
LOG.warn("Exception thrown when copy from "
|
||||||
+ srcKey + " to " + dstKey + ", exception: " + e);
|
+ srcKey + " to " + dstKey + ", exception: " + e);
|
||||||
|
|
|
@ -650,13 +650,15 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
dstPath));
|
dstPath));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean succeed;
|
||||||
if (srcStatus.isDirectory()) {
|
if (srcStatus.isDirectory()) {
|
||||||
copyDirectory(srcPath, dstPath);
|
succeed = copyDirectory(srcPath, dstPath);
|
||||||
} else {
|
} else {
|
||||||
copyFile(srcPath, srcStatus.getLen(), dstPath);
|
succeed = copyFile(srcPath, srcStatus.getLen(), dstPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
return srcPath.equals(dstPath) || delete(srcPath, true);
|
return srcPath.equals(dstPath) || (succeed && delete(srcPath, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -31,6 +31,7 @@ import com.aliyun.oss.model.CompleteMultipartUploadResult;
|
||||||
import com.aliyun.oss.model.CopyObjectResult;
|
import com.aliyun.oss.model.CopyObjectResult;
|
||||||
import com.aliyun.oss.model.DeleteObjectsRequest;
|
import com.aliyun.oss.model.DeleteObjectsRequest;
|
||||||
import com.aliyun.oss.model.DeleteObjectsResult;
|
import com.aliyun.oss.model.DeleteObjectsResult;
|
||||||
|
import com.aliyun.oss.model.GenericRequest;
|
||||||
import com.aliyun.oss.model.GetObjectRequest;
|
import com.aliyun.oss.model.GetObjectRequest;
|
||||||
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
|
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
|
||||||
import com.aliyun.oss.model.InitiateMultipartUploadResult;
|
import com.aliyun.oss.model.InitiateMultipartUploadResult;
|
||||||
|
@ -260,11 +261,13 @@ public class AliyunOSSFileSystemStore {
|
||||||
*/
|
*/
|
||||||
public ObjectMetadata getObjectMetadata(String key) {
|
public ObjectMetadata getObjectMetadata(String key) {
|
||||||
try {
|
try {
|
||||||
ObjectMetadata objectMeta = ossClient.getObjectMetadata(bucketName, key);
|
GenericRequest request = new GenericRequest(bucketName, key);
|
||||||
|
request.setLogEnabled(false);
|
||||||
|
ObjectMetadata objectMeta = ossClient.getObjectMetadata(request);
|
||||||
statistics.incrementReadOps(1);
|
statistics.incrementReadOps(1);
|
||||||
return objectMeta;
|
return objectMeta;
|
||||||
} catch (OSSException osse) {
|
} catch (OSSException osse) {
|
||||||
LOG.error("Exception thrown when get object meta: "
|
LOG.debug("Exception thrown when get object meta: "
|
||||||
+ key + ", exception: " + osse);
|
+ key + ", exception: " + osse);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.fs.aliyun.oss;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
@ -359,4 +360,79 @@ public class TestAliyunOSSFileSystemContract
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRenameChangingDirShouldFail() throws Exception {
|
||||||
|
testRenameDir(true, false, false);
|
||||||
|
testRenameDir(true, true, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRenameDir() throws Exception {
|
||||||
|
testRenameDir(false, true, false);
|
||||||
|
testRenameDir(false, true, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testRenameDir(boolean changing, boolean result, boolean empty)
|
||||||
|
throws Exception {
|
||||||
|
fs.getConf().setLong(Constants.FS_OSS_BLOCK_SIZE_KEY, 1024);
|
||||||
|
String key = "a/b/test.file";
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
if (empty) {
|
||||||
|
fs.createNewFile(this.path(key + "." + i));
|
||||||
|
} else {
|
||||||
|
createFile(this.path(key + "." + i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Path srcPath = this.path("a");
|
||||||
|
Path dstPath = this.path("b");
|
||||||
|
TestRenameTask task = new TestRenameTask(fs, srcPath, dstPath);
|
||||||
|
Thread thread = new Thread(task);
|
||||||
|
thread.start();
|
||||||
|
while (!task.isRunning()) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (changing) {
|
||||||
|
fs.delete(this.path("a/b"), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
thread.join();
|
||||||
|
assertEquals(result, task.isSucceed());
|
||||||
|
}
|
||||||
|
|
||||||
|
class TestRenameTask implements Runnable {
|
||||||
|
private FileSystem fs;
|
||||||
|
private Path srcPath;
|
||||||
|
private Path dstPath;
|
||||||
|
private boolean result;
|
||||||
|
private boolean running;
|
||||||
|
TestRenameTask(FileSystem fs, Path srcPath, Path dstPath) {
|
||||||
|
this.fs = fs;
|
||||||
|
this.srcPath = srcPath;
|
||||||
|
this.dstPath = dstPath;
|
||||||
|
this.result = false;
|
||||||
|
this.running = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isSucceed() {
|
||||||
|
return this.result;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isRunning() {
|
||||||
|
return this.running;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
running = true;
|
||||||
|
result = fs.rename(srcPath, dstPath);
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getGlobalTimeout() {
|
||||||
|
return 120 * 1000;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue