HADOOP-15262. AliyunOSS: move files under a directory in parallel when rename a directory. Contributed by Jinhu Wu.

This commit is contained in:
Sammi Chen 2018-03-19 15:02:37 +08:00
parent 86816da5b4
commit d67a5e2dec
5 changed files with 330 additions and 2 deletions

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Used by {@link AliyunOSSFileSystem} and {@link AliyunOSSCopyFileTask}
* as copy context. It contains some variables used in copy process.
*/
public class AliyunOSSCopyFileContext {
private final ReentrantLock lock = new ReentrantLock();
private Condition readyCondition = lock.newCondition();
private boolean copyFailure;
private int copiesFinish;
public AliyunOSSCopyFileContext() {
copyFailure = false;
copiesFinish = 0;
}
public void lock() {
lock.lock();
}
public void unlock() {
lock.unlock();
}
public void awaitAllFinish(int copiesToFinish) throws InterruptedException {
while (this.copiesFinish != copiesToFinish) {
readyCondition.await();
}
}
public void signalAll() {
readyCondition.signalAll();
}
public boolean isCopyFailure() {
return copyFailure;
}
public void setCopyFailure(boolean copyFailure) {
this.copyFailure = copyFailure;
}
public void incCopiesFinish() {
++copiesFinish;
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Used by {@link AliyunOSSFileSystem} as an task that submitted
* to the thread pool to accelerate the copy progress.
* Each AliyunOSSCopyFileTask copies one file from src path to dst path
*/
public class AliyunOSSCopyFileTask implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSCopyFileTask.class);
private AliyunOSSFileSystemStore store;
private String srcKey;
private String dstKey;
private AliyunOSSCopyFileContext copyFileContext;
public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store,
String srcKey, String dstKey, AliyunOSSCopyFileContext copyFileContext) {
this.store = store;
this.srcKey = srcKey;
this.dstKey = dstKey;
this.copyFileContext = copyFileContext;
}
@Override
public void run() {
boolean fail = false;
try {
store.copyFile(srcKey, dstKey);
} catch (Exception e) {
LOG.warn("Exception thrown when copy from "
+ srcKey + " to " + dstKey + ", exception: " + e);
fail = true;
} finally {
copyFileContext.lock();
if (fail) {
copyFileContext.setCopyFailure(fail);
}
copyFileContext.incCopiesFinish();
copyFileContext.signalAll();
copyFileContext.unlock();
}
}
}

View File

@ -24,9 +24,11 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@ -70,7 +72,9 @@ public class AliyunOSSFileSystem extends FileSystem {
private AliyunOSSFileSystemStore store;
private int maxKeys;
private int maxReadAheadPartNumber;
private int maxConcurrentCopyTasksPerDir;
private ListeningExecutorService boundedThreadPool;
private ListeningExecutorService boundedCopyThreadPool;
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
@ -90,6 +94,7 @@ public class AliyunOSSFileSystem extends FileSystem {
try {
store.close();
boundedThreadPool.shutdown();
boundedCopyThreadPool.shutdown();
} finally {
super.close();
}
@ -331,6 +336,23 @@ public class AliyunOSSFileSystem extends FileSystem {
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY,
Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_DEFAULT);
int maxCopyThreads = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_COPY_THREADS_NUM_KEY,
Constants.MAX_COPY_THREADS_DEFAULT);
int maxCopyTasks = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_COPY_TASKS_KEY,
Constants.MAX_COPY_TASKS_DEFAULT);
this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance(
maxCopyThreads, maxCopyTasks, 60L,
TimeUnit.SECONDS, "oss-copy-unbounded");
setConf(conf);
}
@ -653,14 +675,30 @@ public class AliyunOSSFileSystem extends FileSystem {
}
store.storeEmptyFile(dstKey);
AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
ExecutorService executorService = MoreExecutors.listeningDecorator(
new SemaphoredDelegatingExecutor(boundedCopyThreadPool,
maxConcurrentCopyTasksPerDir, true));
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
statistics.incrementReadOps(1);
// Copy files from src folder to dst
int copiesToFinish = 0;
while (true) {
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String newKey =
dstKey.concat(objectSummary.getKey().substring(srcKey.length()));
store.copyFile(objectSummary.getKey(), newKey);
//copy operation just copies metadata, oss will support shallow copy
executorService.execute(new AliyunOSSCopyFileTask(
store, objectSummary.getKey(), newKey, copyFileContext));
copiesToFinish++;
// No need to call lock() here.
// It's ok to copy one more file if the rename operation failed
// Reduce the call of lock() can also improve our performance
if (copyFileContext.isCopyFailure()) {
//some error occurs, break
break;
}
}
if (objects.isTruncated()) {
String nextMarker = objects.getNextMarker();
@ -670,7 +708,16 @@ public class AliyunOSSFileSystem extends FileSystem {
break;
}
}
return true;
//wait operations in progress to finish
copyFileContext.lock();
try {
copyFileContext.awaitAllFinish(copiesToFinish);
} catch (InterruptedException e) {
LOG.warn("interrupted when wait copies to finish");
} finally {
copyFileContext.unlock();
}
return !copyFileContext.isCopyFailure();
}
@Override

View File

@ -110,6 +110,22 @@ public final class Constants {
"fs.oss.multipart.download.ahead.part.max.number";
public static final int MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT = 4;
// The maximum queue number for copies
// New copies will be blocked when queue is full
public static final String MAX_COPY_TASKS_KEY = "fs.oss.max.copy.tasks";
public static final int MAX_COPY_TASKS_DEFAULT = 1024 * 10240;
// The maximum number of threads allowed in the pool for copies
public static final String MAX_COPY_THREADS_NUM_KEY =
"fs.oss.max.copy.threads";
public static final int MAX_COPY_THREADS_DEFAULT = 25;
// The maximum number of concurrent tasks allowed to copy one directory.
// So we will not block other copies
public static final String MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY =
"fs.oss.max.copy.tasks.per.dir";
public static final int MAX_CONCURRENT_COPY_TASKS_PER_DIR_DEFAULT = 5;
// Comma separated list of directories
public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";

View File

@ -33,6 +33,8 @@ import java.io.IOException;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeNotNull;
import static org.junit.Assume.assumeTrue;
@ -132,6 +134,134 @@ public class TestAliyunOSSFileSystemContract
}
}
@Test
public void testRenameDirectoryConcurrent() throws Exception {
assumeTrue(renameSupported());
Path src = this.path("/test/hadoop/file/");
Path child1 = this.path("/test/hadoop/file/1");
Path child2 = this.path("/test/hadoop/file/2");
Path child3 = this.path("/test/hadoop/file/3");
Path child4 = this.path("/test/hadoop/file/4");
this.createFile(child1);
this.createFile(child2);
this.createFile(child3);
this.createFile(child4);
Path dst = this.path("/test/new");
super.rename(src, dst, true, false, true);
assertEquals(4, this.fs.listStatus(dst).length);
}
@Test
public void testRenameDirectoryCopyTaskAllSucceed() throws Exception {
assumeTrue(renameSupported());
Path srcOne = this.path("/test/hadoop/file/1");
this.createFile(srcOne);
Path dstOne = this.path("/test/new/file/1");
Path dstTwo = this.path("/test/new/file/2");
AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
store.storeEmptyFile("test/new/file/");
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
dstOne.toUri().getPath().substring(1), copyFileContext);
oneCopyFileTask.run();
assumeFalse(copyFileContext.isCopyFailure());
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
dstTwo.toUri().getPath().substring(1), copyFileContext);
twoCopyFileTask.run();
assumeFalse(copyFileContext.isCopyFailure());
copyFileContext.lock();
try {
copyFileContext.awaitAllFinish(2);
} catch (InterruptedException e) {
throw new Exception(e);
} finally {
copyFileContext.unlock();
}
assumeFalse(copyFileContext.isCopyFailure());
}
@Test
public void testRenameDirectoryCopyTaskAllFailed() throws Exception {
assumeTrue(renameSupported());
Path srcOne = this.path("/test/hadoop/file/1");
this.createFile(srcOne);
Path dstOne = new Path("1");
Path dstTwo = new Path("2");
AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
//store.storeEmptyFile("test/new/file/");
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
dstOne.toUri().getPath().substring(1), copyFileContext);
oneCopyFileTask.run();
assumeTrue(copyFileContext.isCopyFailure());
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
dstTwo.toUri().getPath().substring(1), copyFileContext);
twoCopyFileTask.run();
assumeTrue(copyFileContext.isCopyFailure());
copyFileContext.lock();
try {
copyFileContext.awaitAllFinish(2);
} catch (InterruptedException e) {
throw new Exception(e);
} finally {
copyFileContext.unlock();
}
assumeTrue(copyFileContext.isCopyFailure());
}
@Test
public void testRenameDirectoryCopyTaskPartialFailed() throws Exception {
assumeTrue(renameSupported());
Path srcOne = this.path("/test/hadoop/file/1");
this.createFile(srcOne);
Path dstOne = new Path("1");
Path dstTwo = new Path("/test/new/file/2");
Path dstThree = new Path("3");
AliyunOSSCopyFileContext copyFileContext = new AliyunOSSCopyFileContext();
AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
//store.storeEmptyFile("test/new/file/");
AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
dstOne.toUri().getPath().substring(1), copyFileContext);
oneCopyFileTask.run();
assumeTrue(copyFileContext.isCopyFailure());
AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
dstTwo.toUri().getPath().substring(1), copyFileContext);
twoCopyFileTask.run();
assumeTrue(copyFileContext.isCopyFailure());
AliyunOSSCopyFileTask threeCopyFileTask = new AliyunOSSCopyFileTask(
store, srcOne.toUri().getPath().substring(1),
dstThree.toUri().getPath().substring(1), copyFileContext);
threeCopyFileTask.run();
assumeTrue(copyFileContext.isCopyFailure());
copyFileContext.lock();
try {
copyFileContext.awaitAllFinish(3);
} catch (InterruptedException e) {
throw new Exception(e);
} finally {
copyFileContext.unlock();
}
assumeTrue(copyFileContext.isCopyFailure());
}
@Test
public void testRenameDirectoryMoveToNonExistentDirectory() throws Exception {
assumeTrue(renameSupported());