diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 4eac7d5314e..4bb640fd37d 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -483,7 +483,11 @@
hadoop-aws
${project.version}
-
+
+ org.apache.hadoop
+ hadoop-aliyun
+ ${project.version}
+
org.apache.hadoop
hadoop-kms
@@ -1078,6 +1082,22 @@
2.9.1
+
+ com.aliyun.oss
+ aliyun-sdk-oss
+ 2.8.1
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ commons-beanutils
+ commons-beanutils
+
+
+
+
org.apache.curator
curator-recipes
diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
new file mode 100644
index 00000000000..40d78d0cd6c
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
@@ -0,0 +1,18 @@
+
+
+
diff --git a/hadoop-tools/hadoop-aliyun/pom.xml b/hadoop-tools/hadoop-aliyun/pom.xml
new file mode 100644
index 00000000000..357786b5b3d
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/pom.xml
@@ -0,0 +1,147 @@
+
+
+
+ 4.0.0
+
+ org.apache.hadoop
+ hadoop-project
+ 2.9.1-SNAPSHOT
+ ../../hadoop-project
+
+ hadoop-aliyun
+ Apache Hadoop Aliyun OSS support
+ jar
+
+
+ UTF-8
+ true
+
+
+
+
+ tests-off
+
+
+ src/test/resources/auth-keys.xml
+
+
+
+ true
+
+
+
+ tests-on
+
+
+ src/test/resources/auth-keys.xml
+
+
+
+ false
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+
+ true
+ true
+ ${basedir}/dev-support/findbugs-exclude.xml
+
+ Max
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ 3600
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ deplist
+ compile
+
+ list
+
+
+
+ ${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt
+
+
+
+
+
+
+
+
+
+ junit
+ junit
+ test
+
+
+
+ com.aliyun.oss
+ aliyun-sdk-oss
+ compile
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ compile
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ test
+ test-jar
+
+
+ org.apache.hadoop
+ hadoop-distcp
+ test
+
+
+ org.apache.hadoop
+ hadoop-distcp
+ ${project.version}
+ test
+ test-jar
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-tests
+ test
+ test-jar
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+ test
+
+
+
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java
new file mode 100644
index 00000000000..b46c67aa5e7
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import com.aliyun.oss.common.auth.Credentials;
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.aliyun.oss.common.auth.DefaultCredentials;
+import com.aliyun.oss.common.auth.InvalidCredentialsException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+/**
+ * Support session credentials for authenticating with Aliyun.
+ */
+public class AliyunCredentialsProvider implements CredentialsProvider {
+ private Credentials credentials = null;
+
+ public AliyunCredentialsProvider(Configuration conf)
+ throws IOException {
+ String accessKeyId;
+ String accessKeySecret;
+ String securityToken;
+ try {
+ accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID);
+ accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET);
+ } catch (IOException e) {
+ throw new InvalidCredentialsException(e);
+ }
+
+ try {
+ securityToken = AliyunOSSUtils.getValueWithKey(conf, SECURITY_TOKEN);
+ } catch (IOException e) {
+ securityToken = null;
+ }
+
+ if (StringUtils.isEmpty(accessKeyId)
+ || StringUtils.isEmpty(accessKeySecret)) {
+ throw new InvalidCredentialsException(
+ "AccessKeyId and AccessKeySecret should not be null or empty.");
+ }
+
+ if (StringUtils.isNotEmpty(securityToken)) {
+ credentials = new DefaultCredentials(accessKeyId, accessKeySecret,
+ securityToken);
+ } else {
+ credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
+ }
+ }
+
+ @Override
+ public void setCredentials(Credentials creds) {
+ if (creds == null) {
+ throw new InvalidCredentialsException("Credentials should not be null.");
+ }
+
+ credentials = creds;
+ }
+
+ @Override
+ public Credentials getCredentials() {
+ if (credentials == null) {
+ throw new InvalidCredentialsException("Invalid credentials");
+ }
+
+ return credentials;
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
new file mode 100644
index 00000000000..3561b0241ea
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
@@ -0,0 +1,608 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.ObjectMetadata;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+/**
+ * Implementation of {@link FileSystem} for
+ * Aliyun OSS, used to access OSS blob system in a filesystem style.
+ */
+public class AliyunOSSFileSystem extends FileSystem {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AliyunOSSFileSystem.class);
+ private URI uri;
+ private String bucket;
+ private Path workingDir;
+ private AliyunOSSFileSystemStore store;
+ private int maxKeys;
+
+ @Override
+ public FSDataOutputStream append(Path path, int bufferSize,
+ Progressable progress) throws IOException {
+ throw new IOException("Append is not supported!");
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ store.close();
+ } finally {
+ super.close();
+ }
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ String key = pathToKey(path);
+ FileStatus status = null;
+
+ try {
+ // get the status or throw a FNFE
+ status = getFileStatus(path);
+
+ // if the thread reaches here, there is something at the path
+ if (status.isDirectory()) {
+ // path references a directory
+ throw new FileAlreadyExistsException(path + " is a directory");
+ }
+ if (!overwrite) {
+ // path references a file and overwrite is disabled
+ throw new FileAlreadyExistsException(path + " already exists");
+ }
+ LOG.debug("Overwriting file {}", path);
+ } catch (FileNotFoundException e) {
+ // this means the file is not found
+ }
+
+ return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(),
+ store, key, progress, statistics), (Statistics)(null));
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws FileNotFoundException if the parent directory is not present -or
+ * is not a directory.
+ */
+ @Override
+ public FSDataOutputStream createNonRecursive(Path path,
+ FsPermission permission,
+ EnumSet flags,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
+ Path parent = path.getParent();
+ if (parent != null) {
+ // expect this to raise an exception if there is no parent
+ if (!getFileStatus(parent).isDirectory()) {
+ throw new FileAlreadyExistsException("Not a directory: " + parent);
+ }
+ }
+ return create(path, permission,
+ flags.contains(CreateFlag.OVERWRITE), bufferSize,
+ replication, blockSize, progress);
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ try {
+ return innerDelete(getFileStatus(path), recursive);
+ } catch (FileNotFoundException e) {
+ LOG.debug("Couldn't delete {} - does not exist", path);
+ return false;
+ }
+ }
+
+ /**
+ * Delete an object. See {@link #delete(Path, boolean)}.
+ *
+ * @param status fileStatus object
+ * @param recursive if path is a directory and set to
+ * true, the directory is deleted else throws an exception. In
+ * case of a file the recursive can be set to either true or false.
+ * @return true if delete is successful else false.
+ * @throws IOException due to inability to delete a directory or file.
+ */
+ private boolean innerDelete(FileStatus status, boolean recursive)
+ throws IOException {
+ Path f = status.getPath();
+ String p = f.toUri().getPath();
+ FileStatus[] statuses;
+ // indicating root directory "/".
+ if (p.equals("/")) {
+ statuses = listStatus(status.getPath());
+ boolean isEmptyDir = statuses.length <= 0;
+ return rejectRootDirectoryDelete(isEmptyDir, recursive);
+ }
+
+ String key = pathToKey(f);
+ if (status.isDirectory()) {
+ if (!recursive) {
+ // Check whether it is an empty directory or not
+ statuses = listStatus(status.getPath());
+ if (statuses.length > 0) {
+ throw new IOException("Cannot remove directory " + f +
+ ": It is not empty!");
+ } else {
+ // Delete empty directory without '-r'
+ key = AliyunOSSUtils.maybeAddTrailingSlash(key);
+ store.deleteObject(key);
+ }
+ } else {
+ store.deleteDirs(key);
+ }
+ } else {
+ store.deleteObject(key);
+ }
+
+ createFakeDirectoryIfNecessary(f);
+ return true;
+ }
+
+ /**
+ * Implements the specific logic to reject root directory deletion.
+ * The caller must return the result of this call, rather than
+ * attempt to continue with the delete operation: deleting root
+ * directories is never allowed. This method simply implements
+ * the policy of when to return an exit code versus raise an exception.
+ * @param isEmptyDir empty directory or not
+ * @param recursive recursive flag from command
+ * @return a return code for the operation
+ * @throws PathIOException if the operation was explicitly rejected.
+ */
+ private boolean rejectRootDirectoryDelete(boolean isEmptyDir,
+ boolean recursive) throws IOException {
+ LOG.info("oss delete the {} root directory of {}", bucket, recursive);
+ if (isEmptyDir) {
+ return true;
+ }
+ if (recursive) {
+ return false;
+ } else {
+ // reject
+ throw new PathIOException(bucket, "Cannot delete root path");
+ }
+ }
+
+ private void createFakeDirectoryIfNecessary(Path f) throws IOException {
+ String key = pathToKey(f);
+ if (StringUtils.isNotEmpty(key) && !exists(f)) {
+ LOG.debug("Creating new fake directory at {}", f);
+ mkdir(pathToKey(f.getParent()));
+ }
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ Path qualifiedPath = path.makeQualified(uri, workingDir);
+ String key = pathToKey(qualifiedPath);
+
+ // Root always exists
+ if (key.length() == 0) {
+ return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
+ }
+
+ ObjectMetadata meta = store.getObjectMetadata(key);
+ // If key not found and key does not end with "/"
+ if (meta == null && !key.endsWith("/")) {
+ // In case of 'dir + "/"'
+ key += "/";
+ meta = store.getObjectMetadata(key);
+ }
+ if (meta == null) {
+ ObjectListing listing = store.listObjects(key, 1, null, false);
+ if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
+ CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
+ return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
+ } else {
+ throw new FileNotFoundException(path + ": No such file or directory!");
+ }
+ } else if (objectRepresentsDirectory(key, meta.getContentLength())) {
+ return new FileStatus(0, true, 1, 0, meta.getLastModified().getTime(),
+ qualifiedPath);
+ } else {
+ return new FileStatus(meta.getContentLength(), false, 1,
+ getDefaultBlockSize(path), meta.getLastModified().getTime(),
+ qualifiedPath);
+ }
+ }
+
+ @Override
+ public String getScheme() {
+ return "oss";
+ }
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+ @Deprecated
+ public long getDefaultBlockSize() {
+ return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT);
+ }
+
+ @Override
+ public String getCanonicalServiceName() {
+ // Does not support Token
+ return null;
+ }
+
+ /**
+ * Initialize new FileSystem.
+ *
+ * @param name the uri of the file system, including host, port, etc.
+ * @param conf configuration of the file system
+ * @throws IOException IO problems
+ */
+ public void initialize(URI name, Configuration conf) throws IOException {
+ super.initialize(name, conf);
+
+ bucket = name.getHost();
+ uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
+ workingDir = new Path("/user",
+ System.getProperty("user.name")).makeQualified(uri, null);
+
+ store = new AliyunOSSFileSystemStore();
+ store.initialize(name, conf, statistics);
+ maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
+ setConf(conf);
+ }
+
+ /**
+ * Check if OSS object represents a directory.
+ *
+ * @param name object key
+ * @param size object content length
+ * @return true if object represents a directory
+ */
+ private boolean objectRepresentsDirectory(final String name,
+ final long size) {
+ return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
+ }
+
+ /**
+ * Turn a path (relative or otherwise) into an OSS key.
+ *
+ * @param path the path of the file.
+ * @return the key of the object that represents the file.
+ */
+ private String pathToKey(Path path) {
+ if (!path.isAbsolute()) {
+ path = new Path(workingDir, path);
+ }
+
+ return path.toUri().getPath().substring(1);
+ }
+
+ private Path keyToPath(String key) {
+ return new Path("/" + key);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ String key = pathToKey(path);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("List status for path: " + path);
+ }
+
+ final List result = new ArrayList();
+ final FileStatus fileStatus = getFileStatus(path);
+
+ if (fileStatus.isDirectory()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("listStatus: doing listObjects for directory " + key);
+ }
+
+ ObjectListing objects = store.listObjects(key, maxKeys, null, false);
+ while (true) {
+ statistics.incrementReadOps(1);
+ for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
+ String objKey = objectSummary.getKey();
+ if (objKey.equals(key + "/")) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring: " + objKey);
+ }
+ continue;
+ } else {
+ Path keyPath = keyToPath(objectSummary.getKey())
+ .makeQualified(uri, workingDir);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding: fi: " + keyPath);
+ }
+ result.add(new FileStatus(objectSummary.getSize(), false, 1,
+ getDefaultBlockSize(keyPath),
+ objectSummary.getLastModified().getTime(), keyPath));
+ }
+ }
+
+ for (String prefix : objects.getCommonPrefixes()) {
+ if (prefix.equals(key + "/")) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring: " + prefix);
+ }
+ continue;
+ } else {
+ Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding: rd: " + keyPath);
+ }
+ result.add(getFileStatus(keyPath));
+ }
+ }
+
+ if (objects.isTruncated()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("listStatus: list truncated - getting next batch");
+ }
+ String nextMarker = objects.getNextMarker();
+ objects = store.listObjects(key, maxKeys, nextMarker, false);
+ statistics.incrementReadOps(1);
+ } else {
+ break;
+ }
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding: rd (not a dir): " + path);
+ }
+ result.add(fileStatus);
+ }
+
+ return result.toArray(new FileStatus[result.size()]);
+ }
+
+ /**
+ * Used to create an empty file that represents an empty directory.
+ *
+ * @param key directory path
+ * @return true if directory is successfully created
+ * @throws IOException
+ */
+ private boolean mkdir(final String key) throws IOException {
+ String dirName = key;
+ if (StringUtils.isNotEmpty(key)) {
+ if (!key.endsWith("/")) {
+ dirName += "/";
+ }
+ store.storeEmptyFile(dirName);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission permission)
+ throws IOException {
+ try {
+ FileStatus fileStatus = getFileStatus(path);
+
+ if (fileStatus.isDirectory()) {
+ return true;
+ } else {
+ throw new FileAlreadyExistsException("Path is a file: " + path);
+ }
+ } catch (FileNotFoundException e) {
+ validatePath(path);
+ String key = pathToKey(path);
+ return mkdir(key);
+ }
+ }
+
+ /**
+ * Check whether the path is a valid path.
+ *
+ * @param path the path to be checked.
+ * @throws IOException
+ */
+ private void validatePath(Path path) throws IOException {
+ Path fPart = path.getParent();
+ do {
+ try {
+ FileStatus fileStatus = getFileStatus(fPart);
+ if (fileStatus.isDirectory()) {
+ // If path exists and a directory, exit
+ break;
+ } else {
+ throw new FileAlreadyExistsException(String.format(
+ "Can't make directory for path '%s', it is a file.", fPart));
+ }
+ } catch (FileNotFoundException fnfe) {
+ }
+ fPart = fPart.getParent();
+ } while (fPart != null);
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+ final FileStatus fileStatus = getFileStatus(path);
+ if (fileStatus.isDirectory()) {
+ throw new FileNotFoundException("Can't open " + path +
+ " because it is a directory");
+ }
+
+ return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
+ pathToKey(path), fileStatus.getLen(), statistics));
+ }
+
+ @Override
+ public boolean rename(Path srcPath, Path dstPath) throws IOException {
+ if (srcPath.isRoot()) {
+ // Cannot rename root of file system
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot rename the root of a filesystem");
+ }
+ return false;
+ }
+ Path parent = dstPath.getParent();
+ while (parent != null && !srcPath.equals(parent)) {
+ parent = parent.getParent();
+ }
+ if (parent != null) {
+ return false;
+ }
+ FileStatus srcStatus = getFileStatus(srcPath);
+ FileStatus dstStatus;
+ try {
+ dstStatus = getFileStatus(dstPath);
+ } catch (FileNotFoundException fnde) {
+ dstStatus = null;
+ }
+ if (dstStatus == null) {
+ // If dst doesn't exist, check whether dst dir exists or not
+ dstStatus = getFileStatus(dstPath.getParent());
+ if (!dstStatus.isDirectory()) {
+ throw new IOException(String.format(
+ "Failed to rename %s to %s, %s is a file", srcPath, dstPath,
+ dstPath.getParent()));
+ }
+ } else {
+ if (srcStatus.getPath().equals(dstStatus.getPath())) {
+ return !srcStatus.isDirectory();
+ } else if (dstStatus.isDirectory()) {
+ // If dst is a directory
+ dstPath = new Path(dstPath, srcPath.getName());
+ FileStatus[] statuses;
+ try {
+ statuses = listStatus(dstPath);
+ } catch (FileNotFoundException fnde) {
+ statuses = null;
+ }
+ if (statuses != null && statuses.length > 0) {
+ // If dst exists and not a directory / not empty
+ throw new FileAlreadyExistsException(String.format(
+ "Failed to rename %s to %s, file already exists or not empty!",
+ srcPath, dstPath));
+ }
+ } else {
+ // If dst is not a directory
+ throw new FileAlreadyExistsException(String.format(
+ "Failed to rename %s to %s, file already exists!", srcPath,
+ dstPath));
+ }
+ }
+ if (srcStatus.isDirectory()) {
+ copyDirectory(srcPath, dstPath);
+ } else {
+ copyFile(srcPath, dstPath);
+ }
+
+ return srcPath.equals(dstPath) || delete(srcPath, true);
+ }
+
+ /**
+ * Copy file from source path to destination path.
+ * (the caller should make sure srcPath is a file and dstPath is valid)
+ *
+ * @param srcPath source path.
+ * @param dstPath destination path.
+ * @return true if file is successfully copied.
+ */
+ private boolean copyFile(Path srcPath, Path dstPath) {
+ String srcKey = pathToKey(srcPath);
+ String dstKey = pathToKey(dstPath);
+ return store.copyFile(srcKey, dstKey);
+ }
+
+ /**
+ * Copy a directory from source path to destination path.
+ * (the caller should make sure srcPath is a directory, and dstPath is valid)
+ *
+ * @param srcPath source path.
+ * @param dstPath destination path.
+ * @return true if directory is successfully copied.
+ */
+ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
+ String srcKey = AliyunOSSUtils
+ .maybeAddTrailingSlash(pathToKey(srcPath));
+ String dstKey = AliyunOSSUtils
+ .maybeAddTrailingSlash(pathToKey(dstPath));
+
+ if (dstKey.startsWith(srcKey)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot rename a directory to a subdirectory of self");
+ }
+ return false;
+ }
+
+ store.storeEmptyFile(dstKey);
+ ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
+ statistics.incrementReadOps(1);
+ // Copy files from src folder to dst
+ while (true) {
+ for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
+ String newKey =
+ dstKey.concat(objectSummary.getKey().substring(srcKey.length()));
+ store.copyFile(objectSummary.getKey(), newKey);
+ }
+ if (objects.isTruncated()) {
+ String nextMarker = objects.getNextMarker();
+ objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
+ statistics.incrementReadOps(1);
+ } else {
+ break;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void setWorkingDirectory(Path dir) {
+ this.workingDir = dir;
+ }
+
+ public AliyunOSSFileSystemStore getStore() {
+ return store;
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
new file mode 100644
index 00000000000..aba3db8cf8a
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
@@ -0,0 +1,549 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.aliyun.oss;
+
+import com.aliyun.oss.ClientConfiguration;
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSSClient;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.aliyun.oss.common.comm.Protocol;
+import com.aliyun.oss.model.AbortMultipartUploadRequest;
+import com.aliyun.oss.model.CannedAccessControlList;
+import com.aliyun.oss.model.CompleteMultipartUploadRequest;
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
+import com.aliyun.oss.model.CopyObjectResult;
+import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.DeleteObjectsResult;
+import com.aliyun.oss.model.GetObjectRequest;
+import com.aliyun.oss.model.InitiateMultipartUploadRequest;
+import com.aliyun.oss.model.InitiateMultipartUploadResult;
+import com.aliyun.oss.model.ListObjectsRequest;
+import com.aliyun.oss.model.ObjectMetadata;
+import com.aliyun.oss.model.ObjectListing;
+import com.aliyun.oss.model.OSSObjectSummary;
+import com.aliyun.oss.model.PartETag;
+import com.aliyun.oss.model.PutObjectResult;
+import com.aliyun.oss.model.UploadPartCopyRequest;
+import com.aliyun.oss.model.UploadPartCopyResult;
+import com.aliyun.oss.model.UploadPartRequest;
+import com.aliyun.oss.model.UploadPartResult;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+/**
+ * Core implementation of Aliyun OSS Filesystem for Hadoop.
+ * Provides the bridging logic between Hadoop's abstract filesystem and
+ * Aliyun OSS.
+ */
+public class AliyunOSSFileSystemStore {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(AliyunOSSFileSystemStore.class);
+ private FileSystem.Statistics statistics;
+ private OSSClient ossClient;
+ private String bucketName;
+ private long uploadPartSize;
+ private long multipartThreshold;
+ private long partSize;
+ private int maxKeys;
+ private String serverSideEncryptionAlgorithm;
+
+ public void initialize(URI uri, Configuration conf,
+ FileSystem.Statistics stat) throws IOException {
+ statistics = stat;
+ ClientConfiguration clientConf = new ClientConfiguration();
+ clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
+ MAXIMUM_CONNECTIONS_DEFAULT));
+ boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY,
+ SECURE_CONNECTIONS_DEFAULT);
+ clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+ clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY,
+ MAX_ERROR_RETRIES_DEFAULT));
+ clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY,
+ ESTABLISH_TIMEOUT_DEFAULT));
+ clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY,
+ SOCKET_TIMEOUT_DEFAULT));
+
+ String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
+ int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
+ if (StringUtils.isNotEmpty(proxyHost)) {
+ clientConf.setProxyHost(proxyHost);
+ if (proxyPort >= 0) {
+ clientConf.setProxyPort(proxyPort);
+ } else {
+ if (secureConnections) {
+ LOG.warn("Proxy host set without port. Using HTTPS default 443");
+ clientConf.setProxyPort(443);
+ } else {
+ LOG.warn("Proxy host set without port. Using HTTP default 80");
+ clientConf.setProxyPort(80);
+ }
+ }
+ String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY);
+ String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY);
+ if ((proxyUsername == null) != (proxyPassword == null)) {
+ String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " +
+ PROXY_PASSWORD_KEY + " set without the other.";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ clientConf.setProxyUsername(proxyUsername);
+ clientConf.setProxyPassword(proxyPassword);
+ clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY));
+ clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY));
+ } else if (proxyPort >= 0) {
+ String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " +
+ PROXY_HOST_KEY;
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+
+ String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
+ if (StringUtils.isEmpty(endPoint)) {
+ throw new IllegalArgumentException("Aliyun OSS endpoint should not be " +
+ "null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
+ }
+ CredentialsProvider provider =
+ AliyunOSSUtils.getCredentialsProvider(conf);
+ ossClient = new OSSClient(endPoint, provider, clientConf);
+ uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
+ MULTIPART_UPLOAD_SIZE_DEFAULT);
+ multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
+ MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
+ partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
+ MULTIPART_UPLOAD_SIZE_DEFAULT);
+ if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
+ partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
+ }
+ serverSideEncryptionAlgorithm =
+ conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
+
+ if (uploadPartSize < 5 * 1024 * 1024) {
+ LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
+ uploadPartSize = 5 * 1024 * 1024;
+ }
+
+ if (multipartThreshold < 5 * 1024 * 1024) {
+ LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
+ multipartThreshold = 5 * 1024 * 1024;
+ }
+
+ if (multipartThreshold > 1024 * 1024 * 1024) {
+ LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
+ multipartThreshold = 1024 * 1024 * 1024;
+ }
+
+ String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
+ if (StringUtils.isNotEmpty(cannedACLName)) {
+ CannedAccessControlList cannedACL =
+ CannedAccessControlList.valueOf(cannedACLName);
+ ossClient.setBucketAcl(bucketName, cannedACL);
+ }
+
+ maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
+ bucketName = uri.getHost();
+ }
+
+ /**
+ * Delete an object, and update write operation statistics.
+ *
+ * @param key key to blob to delete.
+ */
+ public void deleteObject(String key) {
+ ossClient.deleteObject(bucketName, key);
+ statistics.incrementWriteOps(1);
+ }
+
+ /**
+ * Delete a list of keys, and update write operation statistics.
+ *
+ * @param keysToDelete collection of keys to delete.
+ * @throws IOException if failed to delete objects.
+ */
+ public void deleteObjects(List keysToDelete) throws IOException {
+ if (CollectionUtils.isEmpty(keysToDelete)) {
+ LOG.warn("Keys to delete is empty.");
+ return;
+ }
+
+ int retry = 10;
+ int tries = 0;
+ List deleteFailed = keysToDelete;
+ while(CollectionUtils.isNotEmpty(deleteFailed)) {
+ DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName);
+ deleteRequest.setKeys(deleteFailed);
+ // There are two modes to do batch delete:
+ // 1. detail mode: DeleteObjectsResult.getDeletedObjects returns objects
+ // which were deleted successfully.
+ // 2. simple mode: DeleteObjectsResult.getDeletedObjects returns objects
+ // which were deleted unsuccessfully.
+ // Here, we choose the simple mode to do batch delete.
+ deleteRequest.setQuiet(true);
+ DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest);
+ deleteFailed = result.getDeletedObjects();
+ tries++;
+ if (tries == retry) {
+ break;
+ }
+ }
+
+ if (tries == retry && CollectionUtils.isNotEmpty(deleteFailed)) {
+ // Most of time, it is impossible to try 10 times, expect the
+ // Aliyun OSS service problems.
+ throw new IOException("Failed to delete Aliyun OSS objects for " +
+ tries + " times.");
+ }
+ }
+
+ /**
+ * Delete a directory from Aliyun OSS.
+ *
+ * @param key directory key to delete.
+ * @throws IOException if failed to delete directory.
+ */
+ public void deleteDirs(String key) throws IOException {
+ key = AliyunOSSUtils.maybeAddTrailingSlash(key);
+ ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
+ listRequest.setPrefix(key);
+ listRequest.setDelimiter(null);
+ listRequest.setMaxKeys(maxKeys);
+
+ while (true) {
+ ObjectListing objects = ossClient.listObjects(listRequest);
+ statistics.incrementReadOps(1);
+ List keysToDelete = new ArrayList();
+ for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
+ keysToDelete.add(objectSummary.getKey());
+ }
+ deleteObjects(keysToDelete);
+ if (objects.isTruncated()) {
+ listRequest.setMarker(objects.getNextMarker());
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Return metadata of a given object key.
+ *
+ * @param key object key.
+ * @return return null if key does not exist.
+ */
+ public ObjectMetadata getObjectMetadata(String key) {
+ try {
+ return ossClient.getObjectMetadata(bucketName, key);
+ } catch (OSSException osse) {
+ return null;
+ } finally {
+ statistics.incrementReadOps(1);
+ }
+ }
+
+ /**
+ * Upload an empty file as an OSS object, using single upload.
+ *
+ * @param key object key.
+ * @throws IOException if failed to upload object.
+ */
+ public void storeEmptyFile(String key) throws IOException {
+ ObjectMetadata dirMeta = new ObjectMetadata();
+ byte[] buffer = new byte[0];
+ ByteArrayInputStream in = new ByteArrayInputStream(buffer);
+ dirMeta.setContentLength(0);
+ try {
+ ossClient.putObject(bucketName, key, in, dirMeta);
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Copy an object from source key to destination key.
+ *
+ * @param srcKey source key.
+ * @param dstKey destination key.
+ * @return true if file is successfully copied.
+ */
+ public boolean copyFile(String srcKey, String dstKey) {
+ ObjectMetadata objectMeta =
+ ossClient.getObjectMetadata(bucketName, srcKey);
+ long contentLength = objectMeta.getContentLength();
+ if (contentLength <= multipartThreshold) {
+ return singleCopy(srcKey, dstKey);
+ } else {
+ return multipartCopy(srcKey, contentLength, dstKey);
+ }
+ }
+
+ /**
+ * Use single copy to copy an OSS object.
+ * (The caller should make sure srcPath is a file and dstPath is valid)
+ *
+ * @param srcKey source key.
+ * @param dstKey destination key.
+ * @return true if object is successfully copied.
+ */
+ private boolean singleCopy(String srcKey, String dstKey) {
+ CopyObjectResult copyResult =
+ ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
+ LOG.debug(copyResult.getETag());
+ return true;
+ }
+
+ /**
+ * Use multipart copy to copy an OSS object.
+ * (The caller should make sure srcPath is a file and dstPath is valid)
+ *
+ * @param srcKey source key.
+ * @param contentLength data size of the object to copy.
+ * @param dstKey destination key.
+ * @return true if success, or false if upload is aborted.
+ */
+ private boolean multipartCopy(String srcKey, long contentLength,
+ String dstKey) {
+ long realPartSize =
+ AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize);
+ int partNum = (int) (contentLength / realPartSize);
+ if (contentLength % realPartSize != 0) {
+ partNum++;
+ }
+ InitiateMultipartUploadRequest initiateMultipartUploadRequest =
+ new InitiateMultipartUploadRequest(bucketName, dstKey);
+ ObjectMetadata meta = new ObjectMetadata();
+ if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
+ meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
+ }
+ initiateMultipartUploadRequest.setObjectMetadata(meta);
+ InitiateMultipartUploadResult initiateMultipartUploadResult =
+ ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
+ String uploadId = initiateMultipartUploadResult.getUploadId();
+ List partETags = new ArrayList();
+ try {
+ for (int i = 0; i < partNum; i++) {
+ long skipBytes = realPartSize * i;
+ long size = (realPartSize < contentLength - skipBytes) ?
+ realPartSize : contentLength - skipBytes;
+ UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest();
+ partCopyRequest.setSourceBucketName(bucketName);
+ partCopyRequest.setSourceKey(srcKey);
+ partCopyRequest.setBucketName(bucketName);
+ partCopyRequest.setKey(dstKey);
+ partCopyRequest.setUploadId(uploadId);
+ partCopyRequest.setPartSize(size);
+ partCopyRequest.setBeginIndex(skipBytes);
+ partCopyRequest.setPartNumber(i + 1);
+ UploadPartCopyResult partCopyResult =
+ ossClient.uploadPartCopy(partCopyRequest);
+ statistics.incrementWriteOps(1);
+ partETags.add(partCopyResult.getPartETag());
+ }
+ CompleteMultipartUploadRequest completeMultipartUploadRequest =
+ new CompleteMultipartUploadRequest(bucketName, dstKey,
+ uploadId, partETags);
+ CompleteMultipartUploadResult completeMultipartUploadResult =
+ ossClient.completeMultipartUpload(completeMultipartUploadRequest);
+ LOG.debug(completeMultipartUploadResult.getETag());
+ return true;
+ } catch (OSSException | ClientException e) {
+ AbortMultipartUploadRequest abortMultipartUploadRequest =
+ new AbortMultipartUploadRequest(bucketName, dstKey, uploadId);
+ ossClient.abortMultipartUpload(abortMultipartUploadRequest);
+ return false;
+ }
+ }
+
+ /**
+ * Upload a file as an OSS object, using single upload.
+ *
+ * @param key object key.
+ * @param file local file to upload.
+ * @throws IOException if failed to upload object.
+ */
+ public void uploadObject(String key, File file) throws IOException {
+ File object = file.getAbsoluteFile();
+ FileInputStream fis = new FileInputStream(object);
+ ObjectMetadata meta = new ObjectMetadata();
+ meta.setContentLength(object.length());
+ if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
+ meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
+ }
+ try {
+ PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
+ LOG.debug(result.getETag());
+ statistics.incrementWriteOps(1);
+ } finally {
+ fis.close();
+ }
+ }
+
+ /**
+ * Upload a file as an OSS object, using multipart upload.
+ *
+ * @param key object key.
+ * @param file local file to upload.
+ * @throws IOException if failed to upload object.
+ */
+ public void multipartUploadObject(String key, File file) throws IOException {
+ File object = file.getAbsoluteFile();
+ long dataLen = object.length();
+ long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
+ int partNum = (int) (dataLen / realPartSize);
+ if (dataLen % realPartSize != 0) {
+ partNum += 1;
+ }
+
+ InitiateMultipartUploadRequest initiateMultipartUploadRequest =
+ new InitiateMultipartUploadRequest(bucketName, key);
+ ObjectMetadata meta = new ObjectMetadata();
+ if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
+ meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
+ }
+ initiateMultipartUploadRequest.setObjectMetadata(meta);
+ InitiateMultipartUploadResult initiateMultipartUploadResult =
+ ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
+ List partETags = new ArrayList();
+ String uploadId = initiateMultipartUploadResult.getUploadId();
+
+ try {
+ for (int i = 0; i < partNum; i++) {
+ //TODO Optimize this, avoid opening the object multiple times.
+ FileInputStream fis = new FileInputStream(object);
+ try {
+ long skipBytes = realPartSize * i;
+ AliyunOSSUtils.skipFully(fis, skipBytes);
+ long size = (realPartSize < dataLen - skipBytes) ?
+ realPartSize : dataLen - skipBytes;
+ UploadPartRequest uploadPartRequest = new UploadPartRequest();
+ uploadPartRequest.setBucketName(bucketName);
+ uploadPartRequest.setKey(key);
+ uploadPartRequest.setUploadId(uploadId);
+ uploadPartRequest.setInputStream(fis);
+ uploadPartRequest.setPartSize(size);
+ uploadPartRequest.setPartNumber(i + 1);
+ UploadPartResult uploadPartResult =
+ ossClient.uploadPart(uploadPartRequest);
+ statistics.incrementWriteOps(1);
+ partETags.add(uploadPartResult.getPartETag());
+ } finally {
+ fis.close();
+ }
+ }
+ CompleteMultipartUploadRequest completeMultipartUploadRequest =
+ new CompleteMultipartUploadRequest(bucketName, key,
+ uploadId, partETags);
+ CompleteMultipartUploadResult completeMultipartUploadResult =
+ ossClient.completeMultipartUpload(completeMultipartUploadRequest);
+ LOG.debug(completeMultipartUploadResult.getETag());
+ } catch (OSSException | ClientException e) {
+ AbortMultipartUploadRequest abortMultipartUploadRequest =
+ new AbortMultipartUploadRequest(bucketName, key, uploadId);
+ ossClient.abortMultipartUpload(abortMultipartUploadRequest);
+ }
+ }
+
+ /**
+ * list objects.
+ *
+ * @param prefix prefix.
+ * @param maxListingLength max no. of entries
+ * @param marker last key in any previous search.
+ * @param recursive whether to list directory recursively.
+ * @return a list of matches.
+ */
+ public ObjectListing listObjects(String prefix, int maxListingLength,
+ String marker, boolean recursive) {
+ String delimiter = recursive ? null : "/";
+ prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
+ ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
+ listRequest.setPrefix(prefix);
+ listRequest.setDelimiter(delimiter);
+ listRequest.setMaxKeys(maxListingLength);
+ listRequest.setMarker(marker);
+
+ ObjectListing listing = ossClient.listObjects(listRequest);
+ statistics.incrementReadOps(1);
+ return listing;
+ }
+
+ /**
+ * Retrieve a part of an object.
+ *
+ * @param key the object name that is being retrieved from the Aliyun OSS.
+ * @param byteStart start position.
+ * @param byteEnd end position.
+ * @return This method returns null if the key is not found.
+ */
+ public InputStream retrieve(String key, long byteStart, long byteEnd) {
+ try {
+ GetObjectRequest request = new GetObjectRequest(bucketName, key);
+ request.setRange(byteStart, byteEnd);
+ return ossClient.getObject(request).getObjectContent();
+ } catch (OSSException | ClientException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Close OSS client properly.
+ */
+ public void close() {
+ if (ossClient != null) {
+ ossClient.shutdown();
+ ossClient = null;
+ }
+ }
+
+ /**
+ * Clean up all objects matching the prefix.
+ *
+ * @param prefix Aliyun OSS object prefix.
+ * @throws IOException if failed to clean up objects.
+ */
+ public void purge(String prefix) throws IOException {
+ String key;
+ try {
+ ObjectListing objects = listObjects(prefix, maxKeys, null, true);
+ for (OSSObjectSummary object : objects.getObjectSummaries()) {
+ key = object.getKey();
+ ossClient.deleteObject(bucketName, key);
+ }
+
+ for (String dir: objects.getCommonPrefixes()) {
+ deleteDirs(dir);
+ }
+ } catch (OSSException | ClientException e) {
+ LOG.error("Failed to purge " + prefix);
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
new file mode 100644
index 00000000000..3b2bc022e22
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+
+/**
+ * The input stream for OSS blob system.
+ * The class uses multi-part downloading to read data from the object content
+ * stream.
+ */
+public class AliyunOSSInputStream extends FSInputStream {
+ public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class);
+ private final long downloadPartSize;
+ private AliyunOSSFileSystemStore store;
+ private final String key;
+ private Statistics statistics;
+ private boolean closed;
+ private InputStream wrappedStream = null;
+ private long contentLength;
+ private long position;
+ private long partRemaining;
+
+ public AliyunOSSInputStream(Configuration conf,
+ AliyunOSSFileSystemStore store, String key, Long contentLength,
+ Statistics statistics) throws IOException {
+ this.store = store;
+ this.key = key;
+ this.statistics = statistics;
+ this.contentLength = contentLength;
+ downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
+ MULTIPART_DOWNLOAD_SIZE_DEFAULT);
+ reopen(0);
+ closed = false;
+ }
+
+ /**
+ * Reopen the wrapped stream at give position, by seeking for
+ * data of a part length from object content stream.
+ *
+ * @param pos position from start of a file
+ * @throws IOException if failed to reopen
+ */
+ private synchronized void reopen(long pos) throws IOException {
+ long partSize;
+
+ if (pos < 0) {
+ throw new EOFException("Cannot seek at negative position:" + pos);
+ } else if (pos > contentLength) {
+ throw new EOFException("Cannot seek after EOF, contentLength:" +
+ contentLength + " position:" + pos);
+ } else if (pos + downloadPartSize > contentLength) {
+ partSize = contentLength - pos;
+ } else {
+ partSize = downloadPartSize;
+ }
+
+ if (wrappedStream != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Aborting old stream to open at pos " + pos);
+ }
+ wrappedStream.close();
+ }
+
+ wrappedStream = store.retrieve(key, pos, pos + partSize -1);
+ if (wrappedStream == null) {
+ throw new IOException("Null IO stream");
+ }
+ position = pos;
+ partRemaining = partSize;
+ }
+
+ @Override
+ public synchronized int read() throws IOException {
+ checkNotClosed();
+
+ if (partRemaining <= 0 && position < contentLength) {
+ reopen(position);
+ }
+
+ int tries = MAX_RETRIES;
+ boolean retry;
+ int byteRead = -1;
+ do {
+ retry = false;
+ try {
+ byteRead = wrappedStream.read();
+ } catch (Exception e) {
+ handleReadException(e, --tries);
+ retry = true;
+ }
+ } while (retry);
+ if (byteRead >= 0) {
+ position++;
+ partRemaining--;
+ }
+
+ if (statistics != null && byteRead >= 0) {
+ statistics.incrementBytesRead(byteRead);
+ }
+ return byteRead;
+ }
+
+
+ /**
+ * Verify that the input stream is open. Non blocking; this gives
+ * the last state of the volatile {@link #closed} field.
+ *
+ * @throws IOException if the connection is closed.
+ */
+ private void checkNotClosed() throws IOException {
+ if (closed) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
+
+ @Override
+ public synchronized int read(byte[] buf, int off, int len)
+ throws IOException {
+ checkNotClosed();
+
+ if (buf == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > buf.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ int bytesRead = 0;
+ // Not EOF, and read not done
+ while (position < contentLength && bytesRead < len) {
+ if (partRemaining == 0) {
+ reopen(position);
+ }
+
+ int tries = MAX_RETRIES;
+ boolean retry;
+ int bytes = -1;
+ do {
+ retry = false;
+ try {
+ bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
+ } catch (Exception e) {
+ handleReadException(e, --tries);
+ retry = true;
+ }
+ } while (retry);
+
+ if (bytes > 0) {
+ bytesRead += bytes;
+ position += bytes;
+ partRemaining -= bytes;
+ } else if (partRemaining != 0) {
+ throw new IOException("Failed to read from stream. Remaining:" +
+ partRemaining);
+ }
+ }
+
+ if (statistics != null && bytesRead > 0) {
+ statistics.incrementBytesRead(bytesRead);
+ }
+
+ // Read nothing, but attempt to read something
+ if (bytesRead == 0 && len > 0) {
+ return -1;
+ } else {
+ return bytesRead;
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ if (wrappedStream != null) {
+ wrappedStream.close();
+ }
+ }
+
+ @Override
+ public synchronized int available() throws IOException {
+ checkNotClosed();
+
+ long remaining = contentLength - position;
+ if (remaining > Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ return (int)remaining;
+ }
+
+ @Override
+ public synchronized void seek(long pos) throws IOException {
+ checkNotClosed();
+ if (position == pos) {
+ return;
+ } else if (pos > position && pos < position + partRemaining) {
+ long len = pos - position;
+ AliyunOSSUtils.skipFully(wrappedStream, len);
+ position = pos;
+ partRemaining -= len;
+ } else {
+ reopen(pos);
+ }
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ checkNotClosed();
+ return position;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ checkNotClosed();
+ return false;
+ }
+
+ private void handleReadException(Exception e, int tries) throws IOException{
+ if (tries == 0) {
+ throw new IOException(e);
+ }
+
+ LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
+ " connection at position '" + position + "', " + e.getMessage());
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e2) {
+ LOG.warn(e2.getMessage());
+ }
+ reopen(position);
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
new file mode 100644
index 00000000000..c952d0ae858
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The output stream for OSS blob system.
+ * Data will be buffered on local disk, then uploaded to OSS in
+ * {@link #close()} method.
+ */
+public class AliyunOSSOutputStream extends OutputStream {
+ public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class);
+ private AliyunOSSFileSystemStore store;
+ private final String key;
+ private Statistics statistics;
+ private Progressable progress;
+ private long partSizeThreshold;
+ private LocalDirAllocator dirAlloc;
+ private boolean closed;
+ private File tmpFile;
+ private BufferedOutputStream backupStream;
+
+ public AliyunOSSOutputStream(Configuration conf,
+ AliyunOSSFileSystemStore store, String key, Progressable progress,
+ Statistics statistics) throws IOException {
+ this.store = store;
+ this.key = key;
+ // The caller cann't get any progress information
+ this.progress = progress;
+ this.statistics = statistics;
+ partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
+ MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
+
+ if (conf.get(BUFFER_DIR_KEY) == null) {
+ conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
+ }
+ dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
+
+ tmpFile = dirAlloc.createTmpFileForWrite("output-",
+ LocalDirAllocator.SIZE_UNKNOWN, conf);
+ backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
+ closed = false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ if (backupStream != null) {
+ backupStream.close();
+ }
+ long dataLen = tmpFile.length();
+ try {
+ if (dataLen <= partSizeThreshold) {
+ store.uploadObject(key, tmpFile);
+ } else {
+ store.multipartUploadObject(key, tmpFile);
+ }
+ } finally {
+ if (!tmpFile.delete()) {
+ LOG.warn("Can not delete file: " + tmpFile);
+ }
+ }
+ }
+
+
+
+ @Override
+ public synchronized void flush() throws IOException {
+ backupStream.flush();
+ }
+
+ @Override
+ public synchronized void write(int b) throws IOException {
+ backupStream.write(b);
+ statistics.incrementBytesWritten(1);
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
new file mode 100644
index 00000000000..263b4cf8880
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ProviderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+/**
+ * Utility methods for Aliyun OSS code.
+ */
+public final class AliyunOSSUtils {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AliyunOSSUtils.class);
+
+ private AliyunOSSUtils() {
+ }
+
+ /**
+ * Used to get password from configuration.
+ *
+ * @param conf configuration that contains password information
+ * @param key the key of the password
+ * @return the value for the key
+ * @throws IOException if failed to get password from configuration
+ */
+ public static String getValueWithKey(Configuration conf, String key)
+ throws IOException {
+ try {
+ final char[] pass = conf.getPassword(key);
+ if (pass != null) {
+ return (new String(pass)).trim();
+ } else {
+ return "";
+ }
+ } catch (IOException ioe) {
+ throw new IOException("Cannot find password option " + key, ioe);
+ }
+ }
+
+ /**
+ * Skip the requested number of bytes or fail if there are no enough bytes
+ * left. This allows for the possibility that {@link InputStream#skip(long)}
+ * may not skip as many bytes as requested (most likely because of reaching
+ * EOF).
+ *
+ * @param is the input stream to skip.
+ * @param n the number of bytes to skip.
+ * @throws IOException thrown when skipped less number of bytes.
+ */
+ public static void skipFully(InputStream is, long n) throws IOException {
+ long total = 0;
+ long cur = 0;
+
+ do {
+ cur = is.skip(n - total);
+ total += cur;
+ } while((total < n) && (cur > 0));
+
+ if (total < n) {
+ throw new IOException("Failed to skip " + n + " bytes, possibly due " +
+ "to EOF.");
+ }
+ }
+
+ /**
+ * Calculate a proper size of multipart piece. If minPartSize
+ * is too small, the number of multipart pieces may exceed the limit of
+ * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}.
+ *
+ * @param contentLength the size of file.
+ * @param minPartSize the minimum size of multipart piece.
+ * @return a revisional size of multipart piece.
+ */
+ public static long calculatePartSize(long contentLength, long minPartSize) {
+ long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1;
+ return Math.max(minPartSize, tmpPartSize);
+ }
+
+ /**
+ * Create credential provider specified by configuration, or create default
+ * credential provider if not specified.
+ *
+ * @param conf configuration
+ * @return a credential provider
+ * @throws IOException on any problem. Class construction issues may be
+ * nested inside the IOE.
+ */
+ public static CredentialsProvider getCredentialsProvider(Configuration conf)
+ throws IOException {
+ CredentialsProvider credentials;
+
+ String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY);
+ if (StringUtils.isEmpty(className)) {
+ Configuration newConf =
+ ProviderUtils.excludeIncompatibleCredentialProviders(conf,
+ AliyunOSSFileSystem.class);
+ credentials = new AliyunCredentialsProvider(newConf);
+ } else {
+ try {
+ LOG.debug("Credential provider class is:" + className);
+ Class> credClass = Class.forName(className);
+ try {
+ credentials =
+ (CredentialsProvider)credClass.getDeclaredConstructor(
+ Configuration.class).newInstance(conf);
+ } catch (NoSuchMethodException | SecurityException e) {
+ credentials =
+ (CredentialsProvider)credClass.getDeclaredConstructor()
+ .newInstance();
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IOException(className + " not found.", e);
+ } catch (NoSuchMethodException | SecurityException e) {
+ throw new IOException(String.format("%s constructor exception. A " +
+ "class specified in %s must provide an accessible constructor " +
+ "accepting URI and Configuration, or an accessible default " +
+ "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY),
+ e);
+ } catch (ReflectiveOperationException | IllegalArgumentException e) {
+ throw new IOException(className + " instantiation exception.", e);
+ }
+ }
+
+ return credentials;
+ }
+
+ /**
+ * Turns a path (relative or otherwise) into an OSS key, adding a trailing
+ * "/" if the path is not the root and does not already have a "/"
+ * at the end.
+ *
+ * @param key OSS key or ""
+ * @return the with a trailing "/", or, if it is the root key, "".
+ */
+ public static String maybeAddTrailingSlash(String key) {
+ if (StringUtils.isNotEmpty(key) && !key.endsWith("/")) {
+ return key + '/';
+ } else {
+ return key;
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
new file mode 100644
index 00000000000..04a2ccd6c54
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+/**
+ * ALL configuration constants for OSS filesystem.
+ */
+public final class Constants {
+
+ private Constants() {
+ }
+
+ // Class of credential provider
+ public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY =
+ "fs.oss.credentials.provider";
+
+ // OSS access verification
+ public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
+ public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
+ public static final String SECURITY_TOKEN = "fs.oss.securityToken";
+
+ // Number of simultaneous connections to oss
+ public static final String MAXIMUM_CONNECTIONS_KEY =
+ "fs.oss.connection.maximum";
+ public static final int MAXIMUM_CONNECTIONS_DEFAULT = 32;
+
+ // Connect to oss over ssl
+ public static final String SECURE_CONNECTIONS_KEY =
+ "fs.oss.connection.secure.enabled";
+ public static final boolean SECURE_CONNECTIONS_DEFAULT = true;
+
+ // Use a custom endpoint
+ public static final String ENDPOINT_KEY = "fs.oss.endpoint";
+
+ // Connect to oss through a proxy server
+ public static final String PROXY_HOST_KEY = "fs.oss.proxy.host";
+ public static final String PROXY_PORT_KEY = "fs.oss.proxy.port";
+ public static final String PROXY_USERNAME_KEY = "fs.oss.proxy.username";
+ public static final String PROXY_PASSWORD_KEY = "fs.oss.proxy.password";
+ public static final String PROXY_DOMAIN_KEY = "fs.oss.proxy.domain";
+ public static final String PROXY_WORKSTATION_KEY =
+ "fs.oss.proxy.workstation";
+
+ // Number of times we should retry errors
+ public static final String MAX_ERROR_RETRIES_KEY = "fs.oss.attempts.maximum";
+ public static final int MAX_ERROR_RETRIES_DEFAULT = 20;
+
+ // Time until we give up trying to establish a connection to oss
+ public static final String ESTABLISH_TIMEOUT_KEY =
+ "fs.oss.connection.establish.timeout";
+ public static final int ESTABLISH_TIMEOUT_DEFAULT = 50000;
+
+ // Time until we give up on a connection to oss
+ public static final String SOCKET_TIMEOUT_KEY = "fs.oss.connection.timeout";
+ public static final int SOCKET_TIMEOUT_DEFAULT = 200000;
+
+ // Number of records to get while paging through a directory listing
+ public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum";
+ public static final int MAX_PAGING_KEYS_DEFAULT = 1000;
+
+ // Size of each of or multipart pieces in bytes
+ public static final String MULTIPART_UPLOAD_SIZE_KEY =
+ "fs.oss.multipart.upload.size";
+
+ public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024;
+ public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000;
+
+ // Minimum size in bytes before we start a multipart uploads or copy
+ public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY =
+ "fs.oss.multipart.upload.threshold";
+ public static final long MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT =
+ 20 * 1024 * 1024;
+
+ public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
+ "fs.oss.multipart.download.size";
+
+ public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
+
+ // Comma separated list of directories
+ public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
+
+ // private | public-read | public-read-write
+ public static final String CANNED_ACL_KEY = "fs.oss.acl.default";
+ public static final String CANNED_ACL_DEFAULT = "";
+
+ // OSS server-side encryption
+ public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY =
+ "fs.oss.server-side-encryption-algorithm";
+
+ public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size";
+ public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024;
+ public static final String FS_OSS = "oss";
+
+ public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L;
+ public static final int MAX_RETRIES = 10;
+
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java
new file mode 100644
index 00000000000..234567b2b00
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Aliyun OSS Filesystem.
+ */
+package org.apache.hadoop.fs.aliyun.oss;
\ No newline at end of file
diff --git a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md
new file mode 100644
index 00000000000..62e650506ff
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md
@@ -0,0 +1,294 @@
+
+
+# Hadoop-Aliyun module: Integration with Aliyun Web Services
+
+
+
+## Overview
+
+The `hadoop-aliyun` module provides support for Aliyun integration with
+[Aliyun Object Storage Service (Aliyun OSS)](https://www.aliyun.com/product/oss).
+The generated JAR file, `hadoop-aliyun.jar` also declares a transitive
+dependency on all external artifacts which are needed for this support — enabling
+downstream applications to easily use this support.
+
+To make it part of Apache Hadoop's default classpath, simply make sure
+that HADOOP_OPTIONAL_TOOLS in hadoop-env.sh has 'hadoop-aliyun' in the list.
+
+### Features
+
+* Read and write data stored in Aliyun OSS.
+* Present a hierarchical file system view by implementing the standard Hadoop
+[`FileSystem`](../api/org/apache/hadoop/fs/FileSystem.html) interface.
+* Can act as a source of data in a MapReduce job, or a sink.
+
+### Warning #1: Object Stores are not filesystems.
+
+Aliyun OSS is an example of "an object store". In order to achieve scalability
+and especially high availability, Aliyun OSS has relaxed some of the constraints
+which classic "POSIX" filesystems promise.
+
+
+
+Specifically
+
+1. Atomic operations: `delete()` and `rename()` are implemented by recursive
+file-by-file operations. They take time at least proportional to the number of files,
+during which time partial updates may be visible. `delete()` and `rename()`
+can not guarantee atomicity. If the operations are interrupted, the filesystem
+is left in an intermediate state.
+2. File owner and group are persisted, but the permissions model is not enforced.
+Authorization occurs at the level of the entire Aliyun account via
+[Aliyun Resource Access Management (Aliyun RAM)](https://www.aliyun.com/product/ram).
+3. Directory last access time is not tracked.
+4. The append operation is not supported.
+
+### Warning #2: Directory last access time is not tracked,
+features of Hadoop relying on this can have unexpected behaviour. E.g. the
+AggregatedLogDeletionService of YARN will not remove the appropriate logfiles.
+
+### Warning #3: Your Aliyun credentials are valuable
+
+Your Aliyun credentials not only pay for services, they offer read and write
+access to the data. Anyone with the account can not only read your datasets
+—they can delete them.
+
+Do not inadvertently share these credentials through means such as
+1. Checking in to SCM any configuration files containing the secrets.
+2. Logging them to a console, as they invariably end up being seen.
+3. Defining filesystem URIs with the credentials in the URL, such as
+`oss://accessKeyId:accessKeySecret@directory/file`. They will end up in
+logs and error messages.
+4. Including the secrets in bug reports.
+
+If you do any of these: change your credentials immediately!
+
+### Warning #4: The Aliyun OSS client provided by Aliyun E-MapReduce are different from this implementation
+
+Specifically: on Aliyun E-MapReduce, `oss://` is also supported but with
+a different implementation. If you are using Aliyun E-MapReduce,
+follow these instructions —and be aware that all issues related to Aliyun
+OSS integration in E-MapReduce can only be addressed by Aliyun themselves:
+please raise your issues with them.
+
+## OSS
+
+### Authentication properties
+
+
+ fs.oss.accessKeyId
+ Aliyun access key ID
+
+
+
+ fs.oss.accessKeySecret
+ Aliyun access key secret
+
+
+
+ fs.oss.credentials.provider
+
+ Class name of a credentials provider that implements
+ com.aliyun.oss.common.auth.CredentialsProvider. Omit if using access/secret keys
+ or another authentication mechanism. The specified class must provide an
+ accessible constructor accepting java.net.URI and
+ org.apache.hadoop.conf.Configuration, or an accessible default constructor.
+
+
+
+### Other properties
+
+
+ fs.oss.endpoint
+ Aliyun OSS endpoint to connect to. An up-to-date list is
+ provided in the Aliyun OSS Documentation.
+
+
+
+
+ fs.oss.proxy.host
+ Hostname of the (optinal) proxy server for Aliyun OSS connection
+
+
+
+ fs.oss.proxy.port
+ Proxy server port
+
+
+
+ fs.oss.proxy.username
+ Username for authenticating with proxy server
+
+
+
+ fs.oss.proxy.password
+ Password for authenticating with proxy server.
+
+
+
+ fs.oss.proxy.domain
+ Domain for authenticating with proxy server.
+
+
+
+ fs.oss.proxy.workstation
+ Workstation for authenticating with proxy server.
+
+
+
+ fs.oss.attempts.maximum
+ 20
+ How many times we should retry commands on transient errors.
+
+
+
+ fs.oss.connection.establish.timeout
+ 50000
+ Connection setup timeout in milliseconds.
+
+
+
+ fs.oss.connection.timeout
+ 200000
+ Socket connection timeout in milliseconds.
+
+
+
+ fs.oss.paging.maximum
+ 1000
+ How many keys to request from Aliyun OSS when doing directory listings at a time.
+
+
+
+
+ fs.oss.multipart.upload.size
+ 10485760
+ Size of each of multipart pieces in bytes.
+
+
+
+ fs.oss.multipart.upload.threshold
+ 20971520
+ Minimum size in bytes before we start a multipart uploads or copy.
+
+
+
+ fs.oss.multipart.download.size
+ 102400/value>
+ Size in bytes in each request from ALiyun OSS.
+
+
+
+ fs.oss.buffer.dir
+ Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS
+
+
+
+ fs.oss.acl.default
+
+ Set a canned ACL for bucket. Value may be private, public-read, public-read-write.
+
+
+
+
+ fs.oss.server-side-encryption-algorithm
+
+ Specify a server-side encryption algorithm for oss: file system.
+ Unset by default, and the only other currently allowable value is AES256.
+
+
+
+
+ fs.oss.connection.maximum
+ 32
+ Number of simultaneous connections to oss.
+
+
+
+ fs.oss.connection.secure.enabled
+ true
+ Connect to oss over ssl or not, true by default.
+
+
+## Testing the hadoop-aliyun Module
+
+To test `oss://` filesystem client, two files which pass in authentication
+details to the test runner are needed.
+
+1. `auth-keys.xml`
+2. `core-site.xml`
+
+Those two configuration files must be put into
+`hadoop-tools/hadoop-aliyun/src/test/resources`.
+
+### `core-site.xml`
+
+This file pre-exists and sources the configurations created in `auth-keys.xml`.
+
+For most cases, no modification is needed, unless a specific, non-default property
+needs to be set during the testing.
+
+### `auth-keys.xml`
+
+This file triggers the testing of Aliyun OSS module. Without this file,
+*none of the tests in this module will be executed*
+
+It contains the access key Id/secret and proxy information that are needed to
+connect to Aliyun OSS, and an OSS bucket URL should be also provided.
+
+1. `test.fs.oss.name` : the URL of the bucket for Aliyun OSS tests
+
+The contents of the bucket will be cleaned during the testing process, so
+do not use the bucket for any purpose other than testing.
+
+### Run Hadoop contract tests
+Create file `contract-test-options.xml` under `/test/resources`. If a
+specific file `fs.contract.test.fs.oss` test path is not defined, those
+tests will be skipped. Credentials are also needed to run any of those
+tests, they can be copied from `auth-keys.xml` or through direct
+XInclude inclusion. Here is an example of `contract-test-options.xml`:
+
+
+
+
+
+
+
+
+ fs.contract.test.fs.oss
+ oss://spark-tests
+
+
+
+ fs.oss.impl
+ org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
+
+
+
+ fs.oss.endpoint
+ oss-cn-hangzhou.aliyuncs.com
+
+
+
+ fs.oss.buffer.dir
+ /tmp/oss
+
+
+
+ fs.oss.multipart.download.size
+ 102400
+
+
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSTestUtils.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSTestUtils.java
new file mode 100644
index 00000000000..901cb2bd082
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSTestUtils.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.internal.AssumptionViolatedException;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Utility class for Aliyun OSS Tests.
+ */
+public final class AliyunOSSTestUtils {
+
+ private AliyunOSSTestUtils() {
+ }
+
+ /**
+ * Create the test filesystem.
+ *
+ * If the test.fs.oss.name property is not set,
+ * tests will fail.
+ *
+ * @param conf configuration
+ * @return the FS
+ * @throws IOException
+ */
+ public static AliyunOSSFileSystem createTestFileSystem(Configuration conf)
+ throws IOException {
+ String fsname = conf.getTrimmed(
+ TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME, "");
+
+ boolean liveTest = StringUtils.isNotEmpty(fsname);
+ URI testURI = null;
+ if (liveTest) {
+ testURI = URI.create(fsname);
+ liveTest = testURI.getScheme().equals(Constants.FS_OSS);
+ }
+
+ if (!liveTest) {
+ throw new AssumptionViolatedException("No test filesystem in "
+ + TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME);
+ }
+ AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem();
+ ossfs.initialize(testURI, conf);
+ return ossfs;
+ }
+
+ /**
+ * Generate unique test path for multiple user tests.
+ *
+ * @return root test path
+ */
+ public static String generateUniqueTestPath() {
+ String testUniqueForkId = System.getProperty("test.unique.fork.id");
+ return testUniqueForkId == null ? "/test" :
+ "/" + testUniqueForkId + "/test";
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java
new file mode 100644
index 00000000000..e08a4dccfca
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import com.aliyun.oss.common.auth.Credentials;
+import com.aliyun.oss.common.auth.InvalidCredentialsException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.SECURITY_TOKEN;
+
+/**
+ * Tests use of temporary credentials (for example, Aliyun STS & Aliyun OSS).
+ * This test extends a class that "does things to the root directory", and
+ * should only be used against transient filesystems where you don't care about
+ * the data.
+ */
+public class TestAliyunCredentials extends AbstractFSContractTestBase {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new AliyunOSSContract(conf);
+ }
+
+ @Test
+ public void testCredentialMissingAccessKeyId() throws Throwable {
+ Configuration conf = new Configuration();
+ conf.set(ACCESS_KEY_ID, "");
+ conf.set(ACCESS_KEY_SECRET, "accessKeySecret");
+ conf.set(SECURITY_TOKEN, "token");
+ validateCredential(conf);
+ }
+
+ @Test
+ public void testCredentialMissingAccessKeySecret() throws Throwable {
+ Configuration conf = new Configuration();
+ conf.set(ACCESS_KEY_ID, "accessKeyId");
+ conf.set(ACCESS_KEY_SECRET, "");
+ conf.set(SECURITY_TOKEN, "token");
+ validateCredential(conf);
+ }
+
+ private void validateCredential(Configuration conf) {
+ try {
+ AliyunCredentialsProvider provider
+ = new AliyunCredentialsProvider(conf);
+ Credentials credentials = provider.getCredentials();
+ fail("Expected a CredentialInitializationException, got " + credentials);
+ } catch (InvalidCredentialsException expected) {
+ // expected
+ } catch (IOException e) {
+ fail("Unexpected exception.");
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java
new file mode 100644
index 00000000000..19120a62e22
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeNotNull;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests a live Aliyun OSS system.
+ */
+public class TestAliyunOSSFileSystemContract
+ extends FileSystemContractBaseTest {
+ public static final String TEST_FS_OSS_NAME = "test.fs.oss.name";
+ private static Path testRootPath =
+ new Path(AliyunOSSTestUtils.generateUniqueTestPath());
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ fs = AliyunOSSTestUtils.createTestFileSystem(conf);
+ assumeNotNull(fs);
+ }
+
+ public Path getTestBaseDir() {
+ return testRootPath;
+ }
+
+ @Test
+ public void testMkdirsWithUmask() throws Exception {
+ // not supported
+ }
+
+ @Test
+ public void testRootDirAlwaysExists() throws Exception {
+ //this will throw an exception if the path is not found
+ fs.getFileStatus(super.path("/"));
+ //this catches overrides of the base exists() method that don't
+ //use getFileStatus() as an existence probe
+ assertTrue("FileSystem.exists() fails for root",
+ fs.exists(super.path("/")));
+ }
+
+ @Test
+ public void testRenameRootDirForbidden() throws Exception {
+ assumeTrue(renameSupported());
+ rename(super.path("/"),
+ super.path("/test/newRootDir"),
+ false, true, false);
+ }
+
+ @Test
+ public void testDeleteSubdir() throws IOException {
+ Path parentDir = this.path("/test/hadoop");
+ Path file = this.path("/test/hadoop/file");
+ Path subdir = this.path("/test/hadoop/subdir");
+ this.createFile(file);
+
+ assertTrue("Created subdir", this.fs.mkdirs(subdir));
+ assertTrue("File exists", this.fs.exists(file));
+ assertTrue("Parent dir exists", this.fs.exists(parentDir));
+ assertTrue("Subdir exists", this.fs.exists(subdir));
+
+ assertTrue("Deleted subdir", this.fs.delete(subdir, true));
+ assertTrue("Parent should exist", this.fs.exists(parentDir));
+
+ assertTrue("Deleted file", this.fs.delete(file, false));
+ assertTrue("Parent should exist", this.fs.exists(parentDir));
+ }
+
+
+ @Override
+ protected boolean renameSupported() {
+ return true;
+ }
+
+ @Test
+ public void testRenameNonExistentPath() throws Exception {
+ assumeTrue(renameSupported());
+ Path src = this.path("/test/hadoop/path");
+ Path dst = this.path("/test/new/newpath");
+ try {
+ super.rename(src, dst, false, false, false);
+ fail("Should throw FileNotFoundException!");
+ } catch (FileNotFoundException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testRenameFileMoveToNonExistentDirectory() throws Exception {
+ assumeTrue(renameSupported());
+ Path src = this.path("/test/hadoop/file");
+ this.createFile(src);
+ Path dst = this.path("/test/new/newfile");
+ try {
+ super.rename(src, dst, false, true, false);
+ fail("Should throw FileNotFoundException!");
+ } catch (FileNotFoundException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testRenameDirectoryMoveToNonExistentDirectory() throws Exception {
+ assumeTrue(renameSupported());
+ Path src = this.path("/test/hadoop/dir");
+ this.fs.mkdirs(src);
+ Path dst = this.path("/test/new/newdir");
+ try {
+ super.rename(src, dst, false, true, false);
+ fail("Should throw FileNotFoundException!");
+ } catch (FileNotFoundException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testRenameFileMoveToExistingDirectory() throws Exception {
+ super.testRenameFileMoveToExistingDirectory();
+ }
+
+ @Test
+ public void testRenameFileAsExistingFile() throws Exception {
+ assumeTrue(renameSupported());
+ Path src = this.path("/test/hadoop/file");
+ this.createFile(src);
+ Path dst = this.path("/test/new/newfile");
+ this.createFile(dst);
+ try {
+ super.rename(src, dst, false, true, true);
+ fail("Should throw FileAlreadyExistsException");
+ } catch (FileAlreadyExistsException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testRenameDirectoryAsExistingFile() throws Exception {
+ assumeTrue(renameSupported());
+ Path src = this.path("/test/hadoop/dir");
+ this.fs.mkdirs(src);
+ Path dst = this.path("/test/new/newfile");
+ this.createFile(dst);
+ try {
+ super.rename(src, dst, false, true, true);
+ fail("Should throw FileAlreadyExistsException");
+ } catch (FileAlreadyExistsException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testGetFileStatusFileAndDirectory() throws Exception {
+ Path filePath = this.path("/test/oss/file1");
+ this.createFile(filePath);
+ assertTrue("Should be file", this.fs.getFileStatus(filePath).isFile());
+ assertFalse("Should not be directory",
+ this.fs.getFileStatus(filePath).isDirectory());
+
+ Path dirPath = this.path("/test/oss/dir");
+ this.fs.mkdirs(dirPath);
+ assertTrue("Should be directory",
+ this.fs.getFileStatus(dirPath).isDirectory());
+ assertFalse("Should not be file", this.fs.getFileStatus(dirPath).isFile());
+
+ Path parentPath = this.path("/test/oss");
+ for (FileStatus fileStatus: fs.listStatus(parentPath)) {
+ assertTrue("file and directory should be new",
+ fileStatus.getModificationTime() > 0L);
+ }
+ }
+
+ @Test
+ public void testMkdirsForExistingFile() throws Exception {
+ Path testFile = this.path("/test/hadoop/file");
+ assertFalse(this.fs.exists(testFile));
+ this.createFile(testFile);
+ assertTrue(this.fs.exists(testFile));
+ try {
+ this.fs.mkdirs(testFile);
+ fail("Should throw FileAlreadyExistsException!");
+ } catch (FileAlreadyExistsException e) {
+ // expected
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java
new file mode 100644
index 00000000000..7f4bac25642
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.DigestInputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeNotNull;
+
+/**
+ * Test the bridging logic between Hadoop's abstract filesystem and
+ * Aliyun OSS.
+ */
+public class TestAliyunOSSFileSystemStore {
+ private Configuration conf;
+ private AliyunOSSFileSystemStore store;
+ private AliyunOSSFileSystem fs;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new Configuration();
+ fs = new AliyunOSSFileSystem();
+ fs.initialize(URI.create(conf.get("test.fs.oss.name")), conf);
+ store = fs.getStore();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ store.purge("test");
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @BeforeClass
+ public static void checkSettings() throws Exception {
+ Configuration conf = new Configuration();
+ assumeNotNull(conf.get(Constants.ACCESS_KEY_ID));
+ assumeNotNull(conf.get(Constants.ACCESS_KEY_SECRET));
+ assumeNotNull(conf.get("test.fs.oss.name"));
+ }
+
+ protected void writeRenameReadCompare(Path path, long len)
+ throws IOException, NoSuchAlgorithmException {
+ // If len > fs.oss.multipart.upload.threshold,
+ // we'll use a multipart upload copy
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ OutputStream out = new BufferedOutputStream(
+ new DigestOutputStream(fs.create(path, false), digest));
+ for (long i = 0; i < len; i++) {
+ out.write('Q');
+ }
+ out.flush();
+ out.close();
+
+ assertTrue("Exists", fs.exists(path));
+
+ Path copyPath = path.suffix(".copy");
+ fs.rename(path, copyPath);
+
+ assertTrue("Copy exists", fs.exists(copyPath));
+
+ // Download file from Aliyun OSS and compare the digest against the original
+ MessageDigest digest2 = MessageDigest.getInstance("MD5");
+ InputStream in = new BufferedInputStream(
+ new DigestInputStream(fs.open(copyPath), digest2));
+ long copyLen = 0;
+ while (in.read() != -1) {
+ copyLen++;
+ }
+ in.close();
+
+ assertEquals("Copy length matches original", len, copyLen);
+ assertArrayEquals("Digests match", digest.digest(), digest2.digest());
+ }
+
+ @Test
+ public void testSmallUpload() throws IOException, NoSuchAlgorithmException {
+ // Regular upload, regular copy
+ writeRenameReadCompare(new Path("/test/small"), 16384);
+ }
+
+ @Test
+ public void testLargeUpload()
+ throws IOException, NoSuchAlgorithmException {
+ // Multipart upload, multipart copy
+ writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
new file mode 100644
index 00000000000..d798cafb206
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests basic functionality for AliyunOSSInputStream, including seeking and
+ * reading files.
+ */
+public class TestAliyunOSSInputStream {
+
+ private FileSystem fs;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestAliyunOSSInputStream.class);
+
+ private static String testRootPath =
+ AliyunOSSTestUtils.generateUniqueTestPath();
+
+ @Rule
+ public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ fs = AliyunOSSTestUtils.createTestFileSystem(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (fs != null) {
+ fs.delete(new Path(testRootPath), true);
+ }
+ }
+
+ private Path setPath(String path) {
+ if (path.startsWith("/")) {
+ return new Path(testRootPath + path);
+ } else {
+ return new Path(testRootPath + "/" + path);
+ }
+ }
+
+ @Test
+ public void testSeekFile() throws Exception {
+ Path smallSeekFile = setPath("/test/smallSeekFile.txt");
+ long size = 5 * 1024 * 1024;
+
+ ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
+ LOG.info("5MB file created: smallSeekFile.txt");
+
+ FSDataInputStream instream = this.fs.open(smallSeekFile);
+ int seekTimes = 5;
+ LOG.info("multiple fold position seeking test...:");
+ for (int i = 0; i < seekTimes; i++) {
+ long pos = size / (seekTimes - i) - 1;
+ LOG.info("begin seeking for pos: " + pos);
+ instream.seek(pos);
+ assertTrue("expected position at:" + pos + ", but got:"
+ + instream.getPos(), instream.getPos() == pos);
+ LOG.info("completed seeking at pos: " + instream.getPos());
+ }
+ LOG.info("random position seeking test...:");
+ Random rand = new Random();
+ for (int i = 0; i < seekTimes; i++) {
+ long pos = Math.abs(rand.nextLong()) % size;
+ LOG.info("begin seeking for pos: " + pos);
+ instream.seek(pos);
+ assertTrue("expected position at:" + pos + ", but got:"
+ + instream.getPos(), instream.getPos() == pos);
+ LOG.info("completed seeking at pos: " + instream.getPos());
+ }
+ IOUtils.closeStream(instream);
+ }
+
+ @Test
+ public void testReadFile() throws Exception {
+ final int bufLen = 256;
+ final int sizeFlag = 5;
+ String filename = "readTestFile_" + sizeFlag + ".txt";
+ Path readTestFile = setPath("/test/" + filename);
+ long size = sizeFlag * 1024 * 1024;
+
+ ContractTestUtils.generateTestFile(this.fs, readTestFile, size, 256, 255);
+ LOG.info(sizeFlag + "MB file created: /test/" + filename);
+
+ FSDataInputStream instream = this.fs.open(readTestFile);
+ byte[] buf = new byte[bufLen];
+ long bytesRead = 0;
+ while (bytesRead < size) {
+ int bytes;
+ if (size - bytesRead < bufLen) {
+ int remaining = (int)(size - bytesRead);
+ bytes = instream.read(buf, 0, remaining);
+ } else {
+ bytes = instream.read(buf, 0, bufLen);
+ }
+ bytesRead += bytes;
+
+ if (bytesRead % (1024 * 1024) == 0) {
+ int available = instream.available();
+ int remaining = (int)(size - bytesRead);
+ assertTrue("expected remaining:" + remaining + ", but got:" + available,
+ remaining == available);
+ LOG.info("Bytes read: " + Math.round((double)bytesRead / (1024 * 1024))
+ + " MB");
+ }
+ }
+ assertTrue(instream.available() == 0);
+ IOUtils.closeStream(instream);
+ }
+
+ @Test
+ public void testDirectoryModifiedTime() throws Exception {
+ Path emptyDirPath = setPath("/test/emptyDirectory");
+ fs.mkdirs(emptyDirPath);
+ FileStatus dirFileStatus = fs.getFileStatus(emptyDirPath);
+ assertTrue("expected the empty dir is new",
+ dirFileStatus.getModificationTime() > 0L);
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java
new file mode 100644
index 00000000000..6b87d9ca466
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+
+/**
+ * Tests regular and multi-part upload functionality for AliyunOSSOutputStream.
+ */
+public class TestAliyunOSSOutputStream {
+ private FileSystem fs;
+ private static String testRootPath =
+ AliyunOSSTestUtils.generateUniqueTestPath();
+
+ @Rule
+ public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
+ conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024);
+ fs = AliyunOSSTestUtils.createTestFileSystem(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (fs != null) {
+ fs.delete(new Path(testRootPath), true);
+ }
+ }
+
+ protected Path getTestPath() {
+ return new Path(testRootPath + "/test-aliyun-oss");
+ }
+
+ @Test
+ public void testRegularUpload() throws IOException {
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
+ }
+
+ @Test
+ public void testMultiPartUpload() throws IOException {
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
+ }
+
+ @Test
+ public void testMultiPartUploadLimit() throws IOException {
+ long partSize1 = AliyunOSSUtils.calculatePartSize(10 * 1024, 100 * 1024);
+ assert(10 * 1024 / partSize1 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+
+ long partSize2 = AliyunOSSUtils.calculatePartSize(200 * 1024, 100 * 1024);
+ assert(200 * 1024 / partSize2 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+
+ long partSize3 = AliyunOSSUtils.calculatePartSize(10000 * 100 * 1024,
+ 100 * 1024);
+ assert(10000 * 100 * 1024 / partSize3
+ < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+
+ long partSize4 = AliyunOSSUtils.calculatePartSize(10001 * 100 * 1024,
+ 100 * 1024);
+ assert(10001 * 100 * 1024 / partSize4
+ < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/AliyunOSSContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/AliyunOSSContract.java
new file mode 100644
index 00000000000..624c606c6b8
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/AliyunOSSContract.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
+
+/**
+ * The contract of Aliyun OSS: only enabled if the test bucket is provided.
+ */
+public class AliyunOSSContract extends AbstractBondedFSContract {
+
+ public static final String CONTRACT_XML = "contract/aliyun-oss.xml";
+
+ public AliyunOSSContract(Configuration conf) {
+ super(conf);
+ //insert the base features
+ addConfResource(CONTRACT_XML);
+ }
+
+ @Override
+ public String getScheme() {
+ return "oss";
+ }
+
+ @Override
+ public Path getTestPath() {
+ String testUniqueForkId = System.getProperty("test.unique.fork.id");
+ return testUniqueForkId == null ? super.getTestPath() :
+ new Path("/" + testUniqueForkId, "test");
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractCreate.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractCreate.java
new file mode 100644
index 00000000000..88dd8cd2267
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractCreate.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Aliyun OSS contract creating tests.
+ */
+public class TestAliyunOSSContractCreate extends AbstractContractCreateTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new AliyunOSSContract(conf);
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDelete.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDelete.java
new file mode 100644
index 00000000000..1658d806831
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDelete.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Aliyun OSS contract deleting tests.
+ */
+public class TestAliyunOSSContractDelete extends AbstractContractDeleteTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new AliyunOSSContract(conf);
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java
new file mode 100644
index 00000000000..18d09d54149
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
+
+/**
+ * Contract test suite covering Aliyun OSS integration with DistCp.
+ */
+public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest {
+
+ private static final long MULTIPART_SETTING = 8 * 1024 * 1024; // 8 MB
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration newConf = super.createConfiguration();
+ newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
+ newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING);
+ return newConf;
+ }
+
+ @Override
+ protected AliyunOSSContract createContract(Configuration conf) {
+ return new AliyunOSSContract(conf);
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractGetFileStatus.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractGetFileStatus.java
new file mode 100644
index 00000000000..c69124d9243
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractGetFileStatus.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Test getFileStatus and related listing operations.
+ */
+public class TestAliyunOSSContractGetFileStatus
+ extends AbstractContractGetFileStatusTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new AliyunOSSContract(conf);
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractMkdir.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractMkdir.java
new file mode 100644
index 00000000000..6cb754975ca
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractMkdir.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Aliyun OSS contract directory tests.
+ */
+public class TestAliyunOSSContractMkdir extends AbstractContractMkdirTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new AliyunOSSContract(conf);
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractOpen.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractOpen.java
new file mode 100644
index 00000000000..099aba6296f
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractOpen.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Aliyun OSS contract opening file tests.
+ */
+public class TestAliyunOSSContractOpen extends AbstractContractOpenTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new AliyunOSSContract(conf);
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRename.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRename.java
new file mode 100644
index 00000000000..e15b3ba30de
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRename.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Aliyun OSS contract renaming tests.
+ */
+public class TestAliyunOSSContractRename extends AbstractContractRenameTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new AliyunOSSContract(conf);
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRootDir.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRootDir.java
new file mode 100644
index 00000000000..9faae374521
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRootDir.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+/**
+ * Root dir operations against an Aliyun OSS bucket.
+ */
+public class TestAliyunOSSContractRootDir extends
+ AbstractContractRootDirectoryTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestAliyunOSSContractRootDir.class);
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new AliyunOSSContract(conf);
+ }
+
+ @Override
+ public void testListEmptyRootDirectory() throws IOException {
+ for (int attempt = 1, maxAttempts = 10; attempt <= maxAttempts; ++attempt) {
+ try {
+ super.testListEmptyRootDirectory();
+ break;
+ } catch (AssertionError | FileNotFoundException e) {
+ if (attempt < maxAttempts) {
+ LOG.info("Attempt {} of {} for empty root directory test failed. "
+ + "Attempting retry.", attempt, maxAttempts);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
+ fail("Test interrupted.");
+ break;
+ }
+ } else {
+ LOG.error(
+ "Empty root directory test failed {} attempts. Failing test.",
+ maxAttempts);
+ throw e;
+ }
+ }
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractSeek.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractSeek.java
new file mode 100644
index 00000000000..d9b367440a2
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractSeek.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+
+/**
+ * Aliyun OSS contract seeking tests.
+ */
+public class TestAliyunOSSContractSeek extends AbstractContractSeekTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new AliyunOSSContract(conf);
+ }
+
+ @Test
+ public void testSeekBeyondDownloadSize() throws Throwable {
+ describe("seek and read beyond download size.");
+
+ Path byteFile = path("byte_file.txt");
+ // 'fs.oss.multipart.download.size' = 100 * 1024
+ byte[] block = dataset(100 * 1024 + 10, 0, 255);
+ FileSystem fs = getFileSystem();
+ createFile(fs, byteFile, true, block);
+
+ FSDataInputStream instream = getFileSystem().open(byteFile);
+ instream.seek(100 * 1024 - 1);
+ assertEquals(100 * 1024 - 1, instream.getPos());
+ assertEquals(144, instream.read());
+ instream.seek(100 * 1024 + 1);
+ assertEquals(100 * 1024 + 1, instream.getPos());
+ assertEquals(146, instream.read());
+ }
+}
diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml b/hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml
new file mode 100644
index 00000000000..9ec4be6c8af
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml
@@ -0,0 +1,120 @@
+
+
+
+
+
+ fs.contract.test.random-seek-count
+ 10
+
+
+
+ fs.contract.is-blobstore
+ true
+
+
+
+ fs.contract.is-case-sensitive
+ true
+
+
+
+ fs.contract.rename-returns-false-if-source-missing
+ false
+
+
+
+ fs.contract.rename-remove-dest-if-empty-dir
+ false
+
+
+
+ fs.contract.supports-append
+ false
+
+
+
+ fs.contract.supports-atomic-directory-delete
+ false
+
+
+
+ fs.contract.supports-atomic-rename
+ false
+
+
+
+ fs.contract.supports-block-locality
+ false
+
+
+
+ fs.contract.supports-concat
+ false
+
+
+
+ fs.contract.supports-seek
+ true
+
+
+
+ fs.contract.supports-seek-on-closed-file
+ true
+
+
+
+ fs.contract.rejects-seek-past-eof
+ true
+
+
+
+ fs.contract.supports-strict-exceptions
+ true
+
+
+
+ fs.contract.supports-unix-permissions
+ false
+
+
+
+ fs.contract.rename-overwrites-dest
+ true
+
+
+
+ fs.contract.test.root-tests-enabled
+ true
+
+
+
+ fs.contract.supports-getfilestatus
+ true
+
+
+
+ fs.oss.multipart.download.size
+ 102400
+
+
+
+ fs.contract.create-visibility-delayed
+ true
+
+
diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml b/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml
new file mode 100644
index 00000000000..fa4118c2162
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml
@@ -0,0 +1,46 @@
+
+
+
+
+
+
+ hadoop.tmp.dir
+ target/build/test
+ A base for other temporary directories.
+ true
+
+
+
+
+ hadoop.security.authentication
+ simple
+
+
+
+
+
+
+
+
diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties
new file mode 100644
index 00000000000..bb5cbe5ec32
--- /dev/null
+++ b/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=INFO,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml
index 3ecc51b1421..3af7aee392f 100644
--- a/hadoop-tools/hadoop-tools-dist/pom.xml
+++ b/hadoop-tools/hadoop-tools-dist/pom.xml
@@ -100,6 +100,12 @@
compile
${project.version}
+
+ org.apache.hadoop
+ hadoop-aliyun
+ compile
+ ${project.version}
+
org.apache.hadoop
hadoop-sls
diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml
index e307a400926..117d12c0914 100644
--- a/hadoop-tools/pom.xml
+++ b/hadoop-tools/pom.xml
@@ -48,6 +48,7 @@
hadoop-aws
hadoop-azure
hadoop-azure-datalake
+ hadoop-aliyun