From f34d6b937fc4882b57c88ad14b589d11b9de5c47 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Tue, 19 Mar 2019 09:55:32 +0800 Subject: [PATCH] HADOOP-16191. AliyunOSS: improvements for copyFile/copyDirectory and logging. Contributed by wujinhu. (cherry picked from commit 568d3ab8b65d1348dec9c971feffe200e6cba2ef) --- .../fs/aliyun/oss/AliyunOSSCopyFileTask.java | 2 +- .../fs/aliyun/oss/AliyunOSSFileSystem.java | 8 +- .../aliyun/oss/AliyunOSSFileSystemStore.java | 7 +- .../oss/TestAliyunOSSFileSystemContract.java | 76 +++++++++++++++++++ 4 files changed, 87 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java index 7be16467030..0de0031f5e4 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java @@ -50,7 +50,7 @@ public class AliyunOSSCopyFileTask implements Runnable { public void run() { boolean fail = false; try { - store.copyFile(srcKey, srcLen, dstKey); + fail = !store.copyFile(srcKey, srcLen, dstKey); } catch (Exception e) { LOG.warn("Exception thrown when copy from " + srcKey + " to " + dstKey + ", exception: " + e); diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index b4d10472eae..ad359c6c609 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -650,13 +650,15 @@ public class AliyunOSSFileSystem extends FileSystem { dstPath)); } } + + boolean succeed; if (srcStatus.isDirectory()) { - copyDirectory(srcPath, dstPath); + succeed = copyDirectory(srcPath, dstPath); } else { - copyFile(srcPath, srcStatus.getLen(), dstPath); + succeed = copyFile(srcPath, srcStatus.getLen(), dstPath); } - return srcPath.equals(dstPath) || delete(srcPath, true); + return srcPath.equals(dstPath) || (succeed && delete(srcPath, true)); } /** diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java index 1901929f3a0..242d3b12b3b 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java @@ -31,6 +31,7 @@ import com.aliyun.oss.model.CompleteMultipartUploadResult; import com.aliyun.oss.model.CopyObjectResult; import com.aliyun.oss.model.DeleteObjectsRequest; import com.aliyun.oss.model.DeleteObjectsResult; +import com.aliyun.oss.model.GenericRequest; import com.aliyun.oss.model.GetObjectRequest; import com.aliyun.oss.model.InitiateMultipartUploadRequest; import com.aliyun.oss.model.InitiateMultipartUploadResult; @@ -260,11 +261,13 @@ public class AliyunOSSFileSystemStore { */ public ObjectMetadata getObjectMetadata(String key) { try { - ObjectMetadata objectMeta = ossClient.getObjectMetadata(bucketName, key); + GenericRequest request = new GenericRequest(bucketName, key); + request.setLogEnabled(false); + ObjectMetadata objectMeta = ossClient.getObjectMetadata(request); statistics.incrementReadOps(1); return objectMeta; } catch (OSSException osse) { - LOG.error("Exception thrown when get object meta: " + LOG.debug("Exception thrown when get object meta: " + key + ", exception: " + osse); return null; } diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java index d428e7cd4a6..a83c6dac0ca 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java @@ -21,6 +21,7 @@ package org.apache.hadoop.fs.aliyun.oss; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemContractBaseTest; import org.apache.hadoop.fs.Path; @@ -359,4 +360,79 @@ public class TestAliyunOSSFileSystemContract } } + @Test + public void testRenameChangingDirShouldFail() throws Exception { + testRenameDir(true, false, false); + testRenameDir(true, true, true); + } + + @Test + public void testRenameDir() throws Exception { + testRenameDir(false, true, false); + testRenameDir(false, true, true); + } + + private void testRenameDir(boolean changing, boolean result, boolean empty) + throws Exception { + fs.getConf().setLong(Constants.FS_OSS_BLOCK_SIZE_KEY, 1024); + String key = "a/b/test.file"; + for (int i = 0; i < 100; i++) { + if (empty) { + fs.createNewFile(this.path(key + "." + i)); + } else { + createFile(this.path(key + "." + i)); + } + } + + Path srcPath = this.path("a"); + Path dstPath = this.path("b"); + TestRenameTask task = new TestRenameTask(fs, srcPath, dstPath); + Thread thread = new Thread(task); + thread.start(); + while (!task.isRunning()) { + Thread.sleep(1000); + } + + if (changing) { + fs.delete(this.path("a/b"), true); + } + + thread.join(); + assertEquals(result, task.isSucceed()); + } + + class TestRenameTask implements Runnable { + private FileSystem fs; + private Path srcPath; + private Path dstPath; + private boolean result; + private boolean running; + TestRenameTask(FileSystem fs, Path srcPath, Path dstPath) { + this.fs = fs; + this.srcPath = srcPath; + this.dstPath = dstPath; + this.result = false; + this.running = false; + } + + boolean isSucceed() { + return this.result; + } + + boolean isRunning() { + return this.running; + } + @Override + public void run() { + try { + running = true; + result = fs.rename(srcPath, dstPath); + } catch (Exception e) { + } + } + } + + protected int getGlobalTimeout() { + return 120 * 1000; + } }