diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileContext.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileContext.java new file mode 100644 index 00000000000..a843805c21e --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileContext.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Used by {@link AliyunOSSFileSystem} and {@link AliyunOSSCopyFileTask} + * as copy context. It contains some variables used in copy process. + */ +public class AliyunOSSCopyFileContext { + private final ReentrantLock lock = new ReentrantLock(); + + private Condition readyCondition = lock.newCondition(); + + private boolean copyFailure; + private int copiesFinish; + + public AliyunOSSCopyFileContext() { + copyFailure = false; + copiesFinish = 0; + } + + public void lock() { + lock.lock(); + } + + public void unlock() { + lock.unlock(); + } + + public void awaitAllFinish(int copiesToFinish) throws InterruptedException { + while (this.copiesFinish != copiesToFinish) { + readyCondition.await(); + } + } + + public void signalAll() { + readyCondition.signalAll(); + } + + public boolean isCopyFailure() { + return copyFailure; + } + + public void setCopyFailure(boolean copyFailure) { + this.copyFailure = copyFailure; + } + + public void incCopiesFinish() { + ++copiesFinish; + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java new file mode 100644 index 00000000000..42cd17bbd47 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used by {@link AliyunOSSFileSystem} as an task that submitted + * to the thread pool to accelerate the copy progress. + * Each AliyunOSSCopyFileTask copies one file from src path to dst path + */ +public class AliyunOSSCopyFileTask implements Runnable { + public static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSCopyFileTask.class); + + private AliyunOSSFileSystemStore store; + private String srcKey; + private String dstKey; + private AliyunOSSCopyFileContext copyFileContext; + + public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store, + String srcKey, String dstKey, AliyunOSSCopyFileContext copyFileContext) { + this.store = store; + this.srcKey = srcKey; + this.dstKey = dstKey; + this.copyFileContext = copyFileContext; + } + + @Override + public void run() { + boolean fail = false; + try { + store.copyFile(srcKey, dstKey); + } catch (Exception e) { + LOG.warn("Exception thrown when copy from " + + srcKey + " to " + dstKey + ", exception: " + e); + fail = true; + } finally { + copyFileContext.lock(); + if (fail) { + copyFileContext.setCopyFailure(fail); + } + copyFileContext.incCopiesFinish(); + copyFileContext.signalAll(); + copyFileContext.unlock(); + } + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index afff2237af9..b3c63d33535 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -24,9 +24,11 @@ import java.net.URI; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -70,7 +72,9 @@ public class AliyunOSSFileSystem extends FileSystem { private AliyunOSSFileSystemStore store; private int maxKeys; private int maxReadAheadPartNumber; + private int maxConcurrentCopyTasksPerDir; private ListeningExecutorService boundedThreadPool; + private ListeningExecutorService boundedCopyThreadPool; private static final PathFilter DEFAULT_FILTER = new PathFilter() { @Override @@ -90,6 +94,7 @@ public class AliyunOSSFileSystem extends FileSystem { try { store.close(); boundedThreadPool.shutdown(); + boundedCopyThreadPool.shutdown(); } finally { super.close(); } @@ -331,6 +336,23 @@ public class AliyunOSSFileSystem extends FileSystem { this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared"); + + maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf, + Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY, + Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_DEFAULT); + + int maxCopyThreads = AliyunOSSUtils.intPositiveOption(conf, + Constants.MAX_COPY_THREADS_NUM_KEY, + Constants.MAX_COPY_THREADS_DEFAULT); + + int maxCopyTasks = AliyunOSSUtils.intPositiveOption(conf, + Constants.MAX_COPY_TASKS_KEY, + Constants.MAX_COPY_TASKS_DEFAULT); + + this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance( + maxCopyThreads, maxCopyTasks, 60L, + TimeUnit.SECONDS, "oss-copy-unbounded"); + setConf(conf); } @@ -653,14 +675,30 @@ public class AliyunOSSFileSystem extends FileSystem { } store.storeEmptyFile(dstKey); + AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext(); + ExecutorService executorService = MoreExecutors.listeningDecorator( + new SemaphoredDelegatingExecutor(boundedCopyThreadPool, + maxConcurrentCopyTasksPerDir, true)); ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true); statistics.incrementReadOps(1); // Copy files from src folder to dst + int copiesToFinish = 0; while (true) { for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { String newKey = dstKey.concat(objectSummary.getKey().substring(srcKey.length())); - store.copyFile(objectSummary.getKey(), newKey); + + //copy operation just copies metadata, oss will support shallow copy + executorService.execute(new AliyunOSSCopyFileTask( + store, objectSummary.getKey(), newKey, copyFileContext)); + copiesToFinish++; + // No need to call lock() here. + // It's ok to copy one more file if the rename operation failed + // Reduce the call of lock() can also improve our performance + if (copyFileContext.isCopyFailure()) { + //some error occurs, break + break; + } } if (objects.isTruncated()) { String nextMarker = objects.getNextMarker(); @@ -670,7 +708,16 @@ public class AliyunOSSFileSystem extends FileSystem { break; } } - return true; + //wait operations in progress to finish + copyFileContext.lock(); + try { + copyFileContext.awaitAllFinish(copiesToFinish); + } catch (InterruptedException e) { + LOG.warn("interrupted when wait copies to finish"); + } finally { + copyFileContext.unlock(); + } + return !copyFileContext.isCopyFailure(); } @Override diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java index 410adc90373..283927c828e 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java @@ -110,6 +110,22 @@ public final class Constants { "fs.oss.multipart.download.ahead.part.max.number"; public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4; + // The maximum queue number for copies + // New copies will be blocked when queue is full + public static final String MAX_COPY_TASKS_KEY = "fs.oss.max.copy.tasks"; + public static final int MAX_COPY_TASKS_DEFAULT = 1024 * 10240; + + // The maximum number of threads allowed in the pool for copies + public static final String MAX_COPY_THREADS_NUM_KEY = + "fs.oss.max.copy.threads"; + public static final int MAX_COPY_THREADS_DEFAULT = 25; + + // The maximum number of concurrent tasks allowed to copy one directory. + // So we will not block other copies + public static final String MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY = + "fs.oss.max.copy.tasks.per.dir"; + public static final int MAX_CONCURRENT_COPY_TASKS_PER_DIR_DEFAULT = 5; + // Comma separated list of directories public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir"; diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java index 46ab3392162..0570146fd96 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java @@ -33,6 +33,8 @@ import java.io.IOException; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeFalse; import static org.junit.Assume.assumeNotNull; import static org.junit.Assume.assumeTrue; @@ -132,6 +134,134 @@ public class TestAliyunOSSFileSystemContract } } + @Test + public void testRenameDirectoryConcurrent() throws Exception { + assumeTrue(renameSupported()); + Path src = this.path("/test/hadoop/file/"); + Path child1 = this.path("/test/hadoop/file/1"); + Path child2 = this.path("/test/hadoop/file/2"); + Path child3 = this.path("/test/hadoop/file/3"); + Path child4 = this.path("/test/hadoop/file/4"); + + this.createFile(child1); + this.createFile(child2); + this.createFile(child3); + this.createFile(child4); + + Path dst = this.path("/test/new"); + super.rename(src, dst, true, false, true); + assertEquals(4, this.fs.listStatus(dst).length); + } + + @Test + public void testRenameDirectoryCopyTaskAllSucceed() throws Exception { + assumeTrue(renameSupported()); + Path srcOne = this.path("/test/hadoop/file/1"); + this.createFile(srcOne); + + Path dstOne = this.path("/test/new/file/1"); + Path dstTwo = this.path("/test/new/file/2"); + AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext(); + AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore(); + store.storeEmptyFile("test/new/file/"); + AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask( + store, srcOne.toUri().getPath().substring(1), + dstOne.toUri().getPath().substring(1), copyFileContext); + oneCopyFileTask.run(); + assumeFalse(copyFileContext.isCopyFailure()); + + AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask( + store, srcOne.toUri().getPath().substring(1), + dstTwo.toUri().getPath().substring(1), copyFileContext); + twoCopyFileTask.run(); + assumeFalse(copyFileContext.isCopyFailure()); + + copyFileContext.lock(); + try { + copyFileContext.awaitAllFinish(2); + } catch (InterruptedException e) { + throw new Exception(e); + } finally { + copyFileContext.unlock(); + } + assumeFalse(copyFileContext.isCopyFailure()); + } + + @Test + public void testRenameDirectoryCopyTaskAllFailed() throws Exception { + assumeTrue(renameSupported()); + Path srcOne = this.path("/test/hadoop/file/1"); + this.createFile(srcOne); + + Path dstOne = new Path("1"); + Path dstTwo = new Path("2"); + AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext(); + AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore(); + //store.storeEmptyFile("test/new/file/"); + AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask( + store, srcOne.toUri().getPath().substring(1), + dstOne.toUri().getPath().substring(1), copyFileContext); + oneCopyFileTask.run(); + assumeTrue(copyFileContext.isCopyFailure()); + + AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask( + store, srcOne.toUri().getPath().substring(1), + dstTwo.toUri().getPath().substring(1), copyFileContext); + twoCopyFileTask.run(); + assumeTrue(copyFileContext.isCopyFailure()); + + copyFileContext.lock(); + try { + copyFileContext.awaitAllFinish(2); + } catch (InterruptedException e) { + throw new Exception(e); + } finally { + copyFileContext.unlock(); + } + assumeTrue(copyFileContext.isCopyFailure()); + } + + @Test + public void testRenameDirectoryCopyTaskPartialFailed() throws Exception { + assumeTrue(renameSupported()); + Path srcOne = this.path("/test/hadoop/file/1"); + this.createFile(srcOne); + + Path dstOne = new Path("1"); + Path dstTwo = new Path("/test/new/file/2"); + Path dstThree = new Path("3"); + AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext(); + AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore(); + //store.storeEmptyFile("test/new/file/"); + AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask( + store, srcOne.toUri().getPath().substring(1), + dstOne.toUri().getPath().substring(1), copyFileContext); + oneCopyFileTask.run(); + assumeTrue(copyFileContext.isCopyFailure()); + + AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask( + store, srcOne.toUri().getPath().substring(1), + dstTwo.toUri().getPath().substring(1), copyFileContext); + twoCopyFileTask.run(); + assumeTrue(copyFileContext.isCopyFailure()); + + AliyunOSSCopyFileTask threeCopyFileTask = new AliyunOSSCopyFileTask( + store, srcOne.toUri().getPath().substring(1), + dstThree.toUri().getPath().substring(1), copyFileContext); + threeCopyFileTask.run(); + assumeTrue(copyFileContext.isCopyFailure()); + + copyFileContext.lock(); + try { + copyFileContext.awaitAllFinish(3); + } catch (InterruptedException e) { + throw new Exception(e); + } finally { + copyFileContext.unlock(); + } + assumeTrue(copyFileContext.isCopyFailure()); + } + @Test public void testRenameDirectoryMoveToNonExistentDirectory() throws Exception { assumeTrue(renameSupported());