From 32a88442d0f9e9860b1f179da586894cea6a9e10 Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Fri, 24 Nov 2017 14:06:01 +0800 Subject: [PATCH] HADOOP-14964. AliyunOSS: backport Aliyun OSS module to branch-2 and 2.8+ branches. Contributed by Sammi Chen. --- hadoop-project/pom.xml | 22 +- .../dev-support/findbugs-exclude.xml | 18 + hadoop-tools/hadoop-aliyun/pom.xml | 147 +++++ .../aliyun/oss/AliyunCredentialsProvider.java | 87 +++ .../fs/aliyun/oss/AliyunOSSFileSystem.java | 608 ++++++++++++++++++ .../aliyun/oss/AliyunOSSFileSystemStore.java | 549 ++++++++++++++++ .../fs/aliyun/oss/AliyunOSSInputStream.java | 262 ++++++++ .../fs/aliyun/oss/AliyunOSSOutputStream.java | 111 ++++ .../hadoop/fs/aliyun/oss/AliyunOSSUtils.java | 167 +++++ .../hadoop/fs/aliyun/oss/Constants.java | 113 ++++ .../hadoop/fs/aliyun/oss/package-info.java | 22 + .../markdown/tools/hadoop-aliyun/index.md | 294 +++++++++ .../fs/aliyun/oss/AliyunOSSTestUtils.java | 77 +++ .../fs/aliyun/oss/TestAliyunCredentials.java | 78 +++ .../oss/TestAliyunOSSFileSystemContract.java | 218 +++++++ .../oss/TestAliyunOSSFileSystemStore.java | 125 ++++ .../aliyun/oss/TestAliyunOSSInputStream.java | 155 +++++ .../aliyun/oss/TestAliyunOSSOutputStream.java | 91 +++ .../oss/contract/AliyunOSSContract.java | 49 ++ .../contract/TestAliyunOSSContractCreate.java | 35 + .../contract/TestAliyunOSSContractDelete.java | 34 + .../contract/TestAliyunOSSContractDistCp.java | 44 ++ .../TestAliyunOSSContractGetFileStatus.java | 35 + .../contract/TestAliyunOSSContractMkdir.java | 34 + .../contract/TestAliyunOSSContractOpen.java | 34 + .../contract/TestAliyunOSSContractRename.java | 35 + .../TestAliyunOSSContractRootDir.java | 69 ++ .../contract/TestAliyunOSSContractSeek.java | 60 ++ .../test/resources/contract/aliyun-oss.xml | 120 ++++ .../src/test/resources/core-site.xml | 46 ++ .../src/test/resources/log4j.properties | 23 + hadoop-tools/hadoop-tools-dist/pom.xml | 6 + hadoop-tools/pom.xml | 1 + 33 files changed, 3768 insertions(+), 1 deletion(-) create mode 100644 hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml create mode 100644 hadoop-tools/hadoop-aliyun/pom.xml create mode 100644 hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java create mode 100644 hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java create mode 100644 hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java create mode 100644 hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java create mode 100644 hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java create mode 100644 hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java create mode 100644 hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java create mode 100644 hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java create mode 100644 hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSTestUtils.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/AliyunOSSContract.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractCreate.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDelete.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractGetFileStatus.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractMkdir.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractOpen.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRename.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRootDir.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractSeek.java create mode 100644 hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml create mode 100644 hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml create mode 100644 hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 4eac7d5314e..4bb640fd37d 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -483,7 +483,11 @@ hadoop-aws ${project.version} - + + org.apache.hadoop + hadoop-aliyun + ${project.version} + org.apache.hadoop hadoop-kms @@ -1078,6 +1082,22 @@ 2.9.1 + + com.aliyun.oss + aliyun-sdk-oss + 2.8.1 + + + org.apache.httpcomponents + httpclient + + + commons-beanutils + commons-beanutils + + + + org.apache.curator curator-recipes diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml new file mode 100644 index 00000000000..40d78d0cd6c --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml @@ -0,0 +1,18 @@ + + + diff --git a/hadoop-tools/hadoop-aliyun/pom.xml b/hadoop-tools/hadoop-aliyun/pom.xml new file mode 100644 index 00000000000..357786b5b3d --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/pom.xml @@ -0,0 +1,147 @@ + + + + 4.0.0 + + org.apache.hadoop + hadoop-project + 2.9.1-SNAPSHOT + ../../hadoop-project + + hadoop-aliyun + Apache Hadoop Aliyun OSS support + jar + + + UTF-8 + true + + + + + tests-off + + + src/test/resources/auth-keys.xml + + + + true + + + + tests-on + + + src/test/resources/auth-keys.xml + + + + false + + + + + + + + org.codehaus.mojo + findbugs-maven-plugin + + true + true + ${basedir}/dev-support/findbugs-exclude.xml + + Max + + + + org.apache.maven.plugins + maven-surefire-plugin + + 3600 + + + + org.apache.maven.plugins + maven-dependency-plugin + + + deplist + compile + + list + + + + ${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt + + + + + + + + + + junit + junit + test + + + + com.aliyun.oss + aliyun-sdk-oss + compile + + + + org.apache.hadoop + hadoop-common + compile + + + + org.apache.hadoop + hadoop-common + test + test-jar + + + org.apache.hadoop + hadoop-distcp + test + + + org.apache.hadoop + hadoop-distcp + ${project.version} + test + test-jar + + + org.apache.hadoop + hadoop-yarn-server-tests + test + test-jar + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + test + + + diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java new file mode 100644 index 00000000000..b46c67aa5e7 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import com.aliyun.oss.common.auth.Credentials; +import com.aliyun.oss.common.auth.CredentialsProvider; +import com.aliyun.oss.common.auth.DefaultCredentials; +import com.aliyun.oss.common.auth.InvalidCredentialsException; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Support session credentials for authenticating with Aliyun. + */ +public class AliyunCredentialsProvider implements CredentialsProvider { + private Credentials credentials = null; + + public AliyunCredentialsProvider(Configuration conf) + throws IOException { + String accessKeyId; + String accessKeySecret; + String securityToken; + try { + accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID); + accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET); + } catch (IOException e) { + throw new InvalidCredentialsException(e); + } + + try { + securityToken = AliyunOSSUtils.getValueWithKey(conf, SECURITY_TOKEN); + } catch (IOException e) { + securityToken = null; + } + + if (StringUtils.isEmpty(accessKeyId) + || StringUtils.isEmpty(accessKeySecret)) { + throw new InvalidCredentialsException( + "AccessKeyId and AccessKeySecret should not be null or empty."); + } + + if (StringUtils.isNotEmpty(securityToken)) { + credentials = new DefaultCredentials(accessKeyId, accessKeySecret, + securityToken); + } else { + credentials = new DefaultCredentials(accessKeyId, accessKeySecret); + } + } + + @Override + public void setCredentials(Credentials creds) { + if (creds == null) { + throw new InvalidCredentialsException("Credentials should not be null."); + } + + credentials = creds; + } + + @Override + public Credentials getCredentials() { + if (credentials == null) { + throw new InvalidCredentialsException("Invalid credentials"); + } + + return credentials; + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java new file mode 100644 index 00000000000..3561b0241ea --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -0,0 +1,608 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import com.aliyun.oss.model.OSSObjectSummary; +import com.aliyun.oss.model.ObjectListing; +import com.aliyun.oss.model.ObjectMetadata; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Implementation of {@link FileSystem} for + * Aliyun OSS, used to access OSS blob system in a filesystem style. + */ +public class AliyunOSSFileSystem extends FileSystem { + private static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSFileSystem.class); + private URI uri; + private String bucket; + private Path workingDir; + private AliyunOSSFileSystemStore store; + private int maxKeys; + + @Override + public FSDataOutputStream append(Path path, int bufferSize, + Progressable progress) throws IOException { + throw new IOException("Append is not supported!"); + } + + @Override + public void close() throws IOException { + try { + store.close(); + } finally { + super.close(); + } + } + + @Override + public FSDataOutputStream create(Path path, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + String key = pathToKey(path); + FileStatus status = null; + + try { + // get the status or throw a FNFE + status = getFileStatus(path); + + // if the thread reaches here, there is something at the path + if (status.isDirectory()) { + // path references a directory + throw new FileAlreadyExistsException(path + " is a directory"); + } + if (!overwrite) { + // path references a file and overwrite is disabled + throw new FileAlreadyExistsException(path + " already exists"); + } + LOG.debug("Overwriting file {}", path); + } catch (FileNotFoundException e) { + // this means the file is not found + } + + return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(), + store, key, progress, statistics), (Statistics)(null)); + } + + /** + * {@inheritDoc} + * @throws FileNotFoundException if the parent directory is not present -or + * is not a directory. + */ + @Override + public FSDataOutputStream createNonRecursive(Path path, + FsPermission permission, + EnumSet flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + Path parent = path.getParent(); + if (parent != null) { + // expect this to raise an exception if there is no parent + if (!getFileStatus(parent).isDirectory()) { + throw new FileAlreadyExistsException("Not a directory: " + parent); + } + } + return create(path, permission, + flags.contains(CreateFlag.OVERWRITE), bufferSize, + replication, blockSize, progress); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + try { + return innerDelete(getFileStatus(path), recursive); + } catch (FileNotFoundException e) { + LOG.debug("Couldn't delete {} - does not exist", path); + return false; + } + } + + /** + * Delete an object. See {@link #delete(Path, boolean)}. + * + * @param status fileStatus object + * @param recursive if path is a directory and set to + * true, the directory is deleted else throws an exception. In + * case of a file the recursive can be set to either true or false. + * @return true if delete is successful else false. + * @throws IOException due to inability to delete a directory or file. + */ + private boolean innerDelete(FileStatus status, boolean recursive) + throws IOException { + Path f = status.getPath(); + String p = f.toUri().getPath(); + FileStatus[] statuses; + // indicating root directory "/". + if (p.equals("/")) { + statuses = listStatus(status.getPath()); + boolean isEmptyDir = statuses.length <= 0; + return rejectRootDirectoryDelete(isEmptyDir, recursive); + } + + String key = pathToKey(f); + if (status.isDirectory()) { + if (!recursive) { + // Check whether it is an empty directory or not + statuses = listStatus(status.getPath()); + if (statuses.length > 0) { + throw new IOException("Cannot remove directory " + f + + ": It is not empty!"); + } else { + // Delete empty directory without '-r' + key = AliyunOSSUtils.maybeAddTrailingSlash(key); + store.deleteObject(key); + } + } else { + store.deleteDirs(key); + } + } else { + store.deleteObject(key); + } + + createFakeDirectoryIfNecessary(f); + return true; + } + + /** + * Implements the specific logic to reject root directory deletion. + * The caller must return the result of this call, rather than + * attempt to continue with the delete operation: deleting root + * directories is never allowed. This method simply implements + * the policy of when to return an exit code versus raise an exception. + * @param isEmptyDir empty directory or not + * @param recursive recursive flag from command + * @return a return code for the operation + * @throws PathIOException if the operation was explicitly rejected. + */ + private boolean rejectRootDirectoryDelete(boolean isEmptyDir, + boolean recursive) throws IOException { + LOG.info("oss delete the {} root directory of {}", bucket, recursive); + if (isEmptyDir) { + return true; + } + if (recursive) { + return false; + } else { + // reject + throw new PathIOException(bucket, "Cannot delete root path"); + } + } + + private void createFakeDirectoryIfNecessary(Path f) throws IOException { + String key = pathToKey(f); + if (StringUtils.isNotEmpty(key) && !exists(f)) { + LOG.debug("Creating new fake directory at {}", f); + mkdir(pathToKey(f.getParent())); + } + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + Path qualifiedPath = path.makeQualified(uri, workingDir); + String key = pathToKey(qualifiedPath); + + // Root always exists + if (key.length() == 0) { + return new FileStatus(0, true, 1, 0, 0, qualifiedPath); + } + + ObjectMetadata meta = store.getObjectMetadata(key); + // If key not found and key does not end with "/" + if (meta == null && !key.endsWith("/")) { + // In case of 'dir + "/"' + key += "/"; + meta = store.getObjectMetadata(key); + } + if (meta == null) { + ObjectListing listing = store.listObjects(key, 1, null, false); + if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) || + CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) { + return new FileStatus(0, true, 1, 0, 0, qualifiedPath); + } else { + throw new FileNotFoundException(path + ": No such file or directory!"); + } + } else if (objectRepresentsDirectory(key, meta.getContentLength())) { + return new FileStatus(0, true, 1, 0, meta.getLastModified().getTime(), + qualifiedPath); + } else { + return new FileStatus(meta.getContentLength(), false, 1, + getDefaultBlockSize(path), meta.getLastModified().getTime(), + qualifiedPath); + } + } + + @Override + public String getScheme() { + return "oss"; + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Deprecated + public long getDefaultBlockSize() { + return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT); + } + + @Override + public String getCanonicalServiceName() { + // Does not support Token + return null; + } + + /** + * Initialize new FileSystem. + * + * @param name the uri of the file system, including host, port, etc. + * @param conf configuration of the file system + * @throws IOException IO problems + */ + public void initialize(URI name, Configuration conf) throws IOException { + super.initialize(name, conf); + + bucket = name.getHost(); + uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority()); + workingDir = new Path("/user", + System.getProperty("user.name")).makeQualified(uri, null); + + store = new AliyunOSSFileSystemStore(); + store.initialize(name, conf, statistics); + maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); + setConf(conf); + } + + /** + * Check if OSS object represents a directory. + * + * @param name object key + * @param size object content length + * @return true if object represents a directory + */ + private boolean objectRepresentsDirectory(final String name, + final long size) { + return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L; + } + + /** + * Turn a path (relative or otherwise) into an OSS key. + * + * @param path the path of the file. + * @return the key of the object that represents the file. + */ + private String pathToKey(Path path) { + if (!path.isAbsolute()) { + path = new Path(workingDir, path); + } + + return path.toUri().getPath().substring(1); + } + + private Path keyToPath(String key) { + return new Path("/" + key); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + String key = pathToKey(path); + if (LOG.isDebugEnabled()) { + LOG.debug("List status for path: " + path); + } + + final List result = new ArrayList(); + final FileStatus fileStatus = getFileStatus(path); + + if (fileStatus.isDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: doing listObjects for directory " + key); + } + + ObjectListing objects = store.listObjects(key, maxKeys, null, false); + while (true) { + statistics.incrementReadOps(1); + for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { + String objKey = objectSummary.getKey(); + if (objKey.equals(key + "/")) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring: " + objKey); + } + continue; + } else { + Path keyPath = keyToPath(objectSummary.getKey()) + .makeQualified(uri, workingDir); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: fi: " + keyPath); + } + result.add(new FileStatus(objectSummary.getSize(), false, 1, + getDefaultBlockSize(keyPath), + objectSummary.getLastModified().getTime(), keyPath)); + } + } + + for (String prefix : objects.getCommonPrefixes()) { + if (prefix.equals(key + "/")) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring: " + prefix); + } + continue; + } else { + Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd: " + keyPath); + } + result.add(getFileStatus(keyPath)); + } + } + + if (objects.isTruncated()) { + if (LOG.isDebugEnabled()) { + LOG.debug("listStatus: list truncated - getting next batch"); + } + String nextMarker = objects.getNextMarker(); + objects = store.listObjects(key, maxKeys, nextMarker, false); + statistics.incrementReadOps(1); + } else { + break; + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding: rd (not a dir): " + path); + } + result.add(fileStatus); + } + + return result.toArray(new FileStatus[result.size()]); + } + + /** + * Used to create an empty file that represents an empty directory. + * + * @param key directory path + * @return true if directory is successfully created + * @throws IOException + */ + private boolean mkdir(final String key) throws IOException { + String dirName = key; + if (StringUtils.isNotEmpty(key)) { + if (!key.endsWith("/")) { + dirName += "/"; + } + store.storeEmptyFile(dirName); + } + return true; + } + + @Override + public boolean mkdirs(Path path, FsPermission permission) + throws IOException { + try { + FileStatus fileStatus = getFileStatus(path); + + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + path); + } + } catch (FileNotFoundException e) { + validatePath(path); + String key = pathToKey(path); + return mkdir(key); + } + } + + /** + * Check whether the path is a valid path. + * + * @param path the path to be checked. + * @throws IOException + */ + private void validatePath(Path path) throws IOException { + Path fPart = path.getParent(); + do { + try { + FileStatus fileStatus = getFileStatus(fPart); + if (fileStatus.isDirectory()) { + // If path exists and a directory, exit + break; + } else { + throw new FileAlreadyExistsException(String.format( + "Can't make directory for path '%s', it is a file.", fPart)); + } + } catch (FileNotFoundException fnfe) { + } + fPart = fPart.getParent(); + } while (fPart != null); + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + final FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isDirectory()) { + throw new FileNotFoundException("Can't open " + path + + " because it is a directory"); + } + + return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store, + pathToKey(path), fileStatus.getLen(), statistics)); + } + + @Override + public boolean rename(Path srcPath, Path dstPath) throws IOException { + if (srcPath.isRoot()) { + // Cannot rename root of file system + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot rename the root of a filesystem"); + } + return false; + } + Path parent = dstPath.getParent(); + while (parent != null && !srcPath.equals(parent)) { + parent = parent.getParent(); + } + if (parent != null) { + return false; + } + FileStatus srcStatus = getFileStatus(srcPath); + FileStatus dstStatus; + try { + dstStatus = getFileStatus(dstPath); + } catch (FileNotFoundException fnde) { + dstStatus = null; + } + if (dstStatus == null) { + // If dst doesn't exist, check whether dst dir exists or not + dstStatus = getFileStatus(dstPath.getParent()); + if (!dstStatus.isDirectory()) { + throw new IOException(String.format( + "Failed to rename %s to %s, %s is a file", srcPath, dstPath, + dstPath.getParent())); + } + } else { + if (srcStatus.getPath().equals(dstStatus.getPath())) { + return !srcStatus.isDirectory(); + } else if (dstStatus.isDirectory()) { + // If dst is a directory + dstPath = new Path(dstPath, srcPath.getName()); + FileStatus[] statuses; + try { + statuses = listStatus(dstPath); + } catch (FileNotFoundException fnde) { + statuses = null; + } + if (statuses != null && statuses.length > 0) { + // If dst exists and not a directory / not empty + throw new FileAlreadyExistsException(String.format( + "Failed to rename %s to %s, file already exists or not empty!", + srcPath, dstPath)); + } + } else { + // If dst is not a directory + throw new FileAlreadyExistsException(String.format( + "Failed to rename %s to %s, file already exists!", srcPath, + dstPath)); + } + } + if (srcStatus.isDirectory()) { + copyDirectory(srcPath, dstPath); + } else { + copyFile(srcPath, dstPath); + } + + return srcPath.equals(dstPath) || delete(srcPath, true); + } + + /** + * Copy file from source path to destination path. + * (the caller should make sure srcPath is a file and dstPath is valid) + * + * @param srcPath source path. + * @param dstPath destination path. + * @return true if file is successfully copied. + */ + private boolean copyFile(Path srcPath, Path dstPath) { + String srcKey = pathToKey(srcPath); + String dstKey = pathToKey(dstPath); + return store.copyFile(srcKey, dstKey); + } + + /** + * Copy a directory from source path to destination path. + * (the caller should make sure srcPath is a directory, and dstPath is valid) + * + * @param srcPath source path. + * @param dstPath destination path. + * @return true if directory is successfully copied. + */ + private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { + String srcKey = AliyunOSSUtils + .maybeAddTrailingSlash(pathToKey(srcPath)); + String dstKey = AliyunOSSUtils + .maybeAddTrailingSlash(pathToKey(dstPath)); + + if (dstKey.startsWith(srcKey)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cannot rename a directory to a subdirectory of self"); + } + return false; + } + + store.storeEmptyFile(dstKey); + ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true); + statistics.incrementReadOps(1); + // Copy files from src folder to dst + while (true) { + for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { + String newKey = + dstKey.concat(objectSummary.getKey().substring(srcKey.length())); + store.copyFile(objectSummary.getKey(), newKey); + } + if (objects.isTruncated()) { + String nextMarker = objects.getNextMarker(); + objects = store.listObjects(srcKey, maxKeys, nextMarker, true); + statistics.incrementReadOps(1); + } else { + break; + } + } + return true; + } + + @Override + public void setWorkingDirectory(Path dir) { + this.workingDir = dir; + } + + public AliyunOSSFileSystemStore getStore() { + return store; + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java new file mode 100644 index 00000000000..aba3db8cf8a --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java @@ -0,0 +1,549 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.aliyun.oss; + +import com.aliyun.oss.ClientConfiguration; +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.common.auth.CredentialsProvider; +import com.aliyun.oss.common.comm.Protocol; +import com.aliyun.oss.model.AbortMultipartUploadRequest; +import com.aliyun.oss.model.CannedAccessControlList; +import com.aliyun.oss.model.CompleteMultipartUploadRequest; +import com.aliyun.oss.model.CompleteMultipartUploadResult; +import com.aliyun.oss.model.CopyObjectResult; +import com.aliyun.oss.model.DeleteObjectsRequest; +import com.aliyun.oss.model.DeleteObjectsResult; +import com.aliyun.oss.model.GetObjectRequest; +import com.aliyun.oss.model.InitiateMultipartUploadRequest; +import com.aliyun.oss.model.InitiateMultipartUploadResult; +import com.aliyun.oss.model.ListObjectsRequest; +import com.aliyun.oss.model.ObjectMetadata; +import com.aliyun.oss.model.ObjectListing; +import com.aliyun.oss.model.OSSObjectSummary; +import com.aliyun.oss.model.PartETag; +import com.aliyun.oss.model.PutObjectResult; +import com.aliyun.oss.model.UploadPartCopyRequest; +import com.aliyun.oss.model.UploadPartCopyResult; +import com.aliyun.oss.model.UploadPartRequest; +import com.aliyun.oss.model.UploadPartResult; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Core implementation of Aliyun OSS Filesystem for Hadoop. + * Provides the bridging logic between Hadoop's abstract filesystem and + * Aliyun OSS. + */ +public class AliyunOSSFileSystemStore { + public static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSFileSystemStore.class); + private FileSystem.Statistics statistics; + private OSSClient ossClient; + private String bucketName; + private long uploadPartSize; + private long multipartThreshold; + private long partSize; + private int maxKeys; + private String serverSideEncryptionAlgorithm; + + public void initialize(URI uri, Configuration conf, + FileSystem.Statistics stat) throws IOException { + statistics = stat; + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY, + MAXIMUM_CONNECTIONS_DEFAULT)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY, + SECURE_CONNECTIONS_DEFAULT); + clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY, + MAX_ERROR_RETRIES_DEFAULT)); + clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY, + ESTABLISH_TIMEOUT_DEFAULT)); + clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY, + SOCKET_TIMEOUT_DEFAULT)); + + String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, ""); + int proxyPort = conf.getInt(PROXY_PORT_KEY, -1); + if (StringUtils.isNotEmpty(proxyHost)) { + clientConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + clientConf.setProxyPort(proxyPort); + } else { + if (secureConnections) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + clientConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + clientConf.setProxyPort(80); + } + } + String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY); + String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " + + PROXY_PASSWORD_KEY + " set without the other."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + clientConf.setProxyUsername(proxyUsername); + clientConf.setProxyPassword(proxyPassword); + clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY)); + clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY)); + } else if (proxyPort >= 0) { + String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " + + PROXY_HOST_KEY; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + + String endPoint = conf.getTrimmed(ENDPOINT_KEY, ""); + if (StringUtils.isEmpty(endPoint)) { + throw new IllegalArgumentException("Aliyun OSS endpoint should not be " + + "null or empty. Please set proper endpoint with 'fs.oss.endpoint'."); + } + CredentialsProvider provider = + AliyunOSSUtils.getCredentialsProvider(conf); + ossClient = new OSSClient(endPoint, provider, clientConf); + uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, + MULTIPART_UPLOAD_SIZE_DEFAULT); + multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, + MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); + partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, + MULTIPART_UPLOAD_SIZE_DEFAULT); + if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) { + partSize = MIN_MULTIPART_UPLOAD_PART_SIZE; + } + serverSideEncryptionAlgorithm = + conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, ""); + + if (uploadPartSize < 5 * 1024 * 1024) { + LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB"); + uploadPartSize = 5 * 1024 * 1024; + } + + if (multipartThreshold < 5 * 1024 * 1024) { + LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB"); + multipartThreshold = 5 * 1024 * 1024; + } + + if (multipartThreshold > 1024 * 1024 * 1024) { + LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB"); + multipartThreshold = 1024 * 1024 * 1024; + } + + String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT); + if (StringUtils.isNotEmpty(cannedACLName)) { + CannedAccessControlList cannedACL = + CannedAccessControlList.valueOf(cannedACLName); + ossClient.setBucketAcl(bucketName, cannedACL); + } + + maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); + bucketName = uri.getHost(); + } + + /** + * Delete an object, and update write operation statistics. + * + * @param key key to blob to delete. + */ + public void deleteObject(String key) { + ossClient.deleteObject(bucketName, key); + statistics.incrementWriteOps(1); + } + + /** + * Delete a list of keys, and update write operation statistics. + * + * @param keysToDelete collection of keys to delete. + * @throws IOException if failed to delete objects. + */ + public void deleteObjects(List keysToDelete) throws IOException { + if (CollectionUtils.isEmpty(keysToDelete)) { + LOG.warn("Keys to delete is empty."); + return; + } + + int retry = 10; + int tries = 0; + List deleteFailed = keysToDelete; + while(CollectionUtils.isNotEmpty(deleteFailed)) { + DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName); + deleteRequest.setKeys(deleteFailed); + // There are two modes to do batch delete: + // 1. detail mode: DeleteObjectsResult.getDeletedObjects returns objects + // which were deleted successfully. + // 2. simple mode: DeleteObjectsResult.getDeletedObjects returns objects + // which were deleted unsuccessfully. + // Here, we choose the simple mode to do batch delete. + deleteRequest.setQuiet(true); + DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest); + deleteFailed = result.getDeletedObjects(); + tries++; + if (tries == retry) { + break; + } + } + + if (tries == retry && CollectionUtils.isNotEmpty(deleteFailed)) { + // Most of time, it is impossible to try 10 times, expect the + // Aliyun OSS service problems. + throw new IOException("Failed to delete Aliyun OSS objects for " + + tries + " times."); + } + } + + /** + * Delete a directory from Aliyun OSS. + * + * @param key directory key to delete. + * @throws IOException if failed to delete directory. + */ + public void deleteDirs(String key) throws IOException { + key = AliyunOSSUtils.maybeAddTrailingSlash(key); + ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); + listRequest.setPrefix(key); + listRequest.setDelimiter(null); + listRequest.setMaxKeys(maxKeys); + + while (true) { + ObjectListing objects = ossClient.listObjects(listRequest); + statistics.incrementReadOps(1); + List keysToDelete = new ArrayList(); + for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { + keysToDelete.add(objectSummary.getKey()); + } + deleteObjects(keysToDelete); + if (objects.isTruncated()) { + listRequest.setMarker(objects.getNextMarker()); + } else { + break; + } + } + } + + /** + * Return metadata of a given object key. + * + * @param key object key. + * @return return null if key does not exist. + */ + public ObjectMetadata getObjectMetadata(String key) { + try { + return ossClient.getObjectMetadata(bucketName, key); + } catch (OSSException osse) { + return null; + } finally { + statistics.incrementReadOps(1); + } + } + + /** + * Upload an empty file as an OSS object, using single upload. + * + * @param key object key. + * @throws IOException if failed to upload object. + */ + public void storeEmptyFile(String key) throws IOException { + ObjectMetadata dirMeta = new ObjectMetadata(); + byte[] buffer = new byte[0]; + ByteArrayInputStream in = new ByteArrayInputStream(buffer); + dirMeta.setContentLength(0); + try { + ossClient.putObject(bucketName, key, in, dirMeta); + } finally { + in.close(); + } + } + + /** + * Copy an object from source key to destination key. + * + * @param srcKey source key. + * @param dstKey destination key. + * @return true if file is successfully copied. + */ + public boolean copyFile(String srcKey, String dstKey) { + ObjectMetadata objectMeta = + ossClient.getObjectMetadata(bucketName, srcKey); + long contentLength = objectMeta.getContentLength(); + if (contentLength <= multipartThreshold) { + return singleCopy(srcKey, dstKey); + } else { + return multipartCopy(srcKey, contentLength, dstKey); + } + } + + /** + * Use single copy to copy an OSS object. + * (The caller should make sure srcPath is a file and dstPath is valid) + * + * @param srcKey source key. + * @param dstKey destination key. + * @return true if object is successfully copied. + */ + private boolean singleCopy(String srcKey, String dstKey) { + CopyObjectResult copyResult = + ossClient.copyObject(bucketName, srcKey, bucketName, dstKey); + LOG.debug(copyResult.getETag()); + return true; + } + + /** + * Use multipart copy to copy an OSS object. + * (The caller should make sure srcPath is a file and dstPath is valid) + * + * @param srcKey source key. + * @param contentLength data size of the object to copy. + * @param dstKey destination key. + * @return true if success, or false if upload is aborted. + */ + private boolean multipartCopy(String srcKey, long contentLength, + String dstKey) { + long realPartSize = + AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize); + int partNum = (int) (contentLength / realPartSize); + if (contentLength % realPartSize != 0) { + partNum++; + } + InitiateMultipartUploadRequest initiateMultipartUploadRequest = + new InitiateMultipartUploadRequest(bucketName, dstKey); + ObjectMetadata meta = new ObjectMetadata(); + if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + initiateMultipartUploadRequest.setObjectMetadata(meta); + InitiateMultipartUploadResult initiateMultipartUploadResult = + ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); + String uploadId = initiateMultipartUploadResult.getUploadId(); + List partETags = new ArrayList(); + try { + for (int i = 0; i < partNum; i++) { + long skipBytes = realPartSize * i; + long size = (realPartSize < contentLength - skipBytes) ? + realPartSize : contentLength - skipBytes; + UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest(); + partCopyRequest.setSourceBucketName(bucketName); + partCopyRequest.setSourceKey(srcKey); + partCopyRequest.setBucketName(bucketName); + partCopyRequest.setKey(dstKey); + partCopyRequest.setUploadId(uploadId); + partCopyRequest.setPartSize(size); + partCopyRequest.setBeginIndex(skipBytes); + partCopyRequest.setPartNumber(i + 1); + UploadPartCopyResult partCopyResult = + ossClient.uploadPartCopy(partCopyRequest); + statistics.incrementWriteOps(1); + partETags.add(partCopyResult.getPartETag()); + } + CompleteMultipartUploadRequest completeMultipartUploadRequest = + new CompleteMultipartUploadRequest(bucketName, dstKey, + uploadId, partETags); + CompleteMultipartUploadResult completeMultipartUploadResult = + ossClient.completeMultipartUpload(completeMultipartUploadRequest); + LOG.debug(completeMultipartUploadResult.getETag()); + return true; + } catch (OSSException | ClientException e) { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(bucketName, dstKey, uploadId); + ossClient.abortMultipartUpload(abortMultipartUploadRequest); + return false; + } + } + + /** + * Upload a file as an OSS object, using single upload. + * + * @param key object key. + * @param file local file to upload. + * @throws IOException if failed to upload object. + */ + public void uploadObject(String key, File file) throws IOException { + File object = file.getAbsoluteFile(); + FileInputStream fis = new FileInputStream(object); + ObjectMetadata meta = new ObjectMetadata(); + meta.setContentLength(object.length()); + if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + try { + PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta); + LOG.debug(result.getETag()); + statistics.incrementWriteOps(1); + } finally { + fis.close(); + } + } + + /** + * Upload a file as an OSS object, using multipart upload. + * + * @param key object key. + * @param file local file to upload. + * @throws IOException if failed to upload object. + */ + public void multipartUploadObject(String key, File file) throws IOException { + File object = file.getAbsoluteFile(); + long dataLen = object.length(); + long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize); + int partNum = (int) (dataLen / realPartSize); + if (dataLen % realPartSize != 0) { + partNum += 1; + } + + InitiateMultipartUploadRequest initiateMultipartUploadRequest = + new InitiateMultipartUploadRequest(bucketName, key); + ObjectMetadata meta = new ObjectMetadata(); + if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { + meta.setServerSideEncryption(serverSideEncryptionAlgorithm); + } + initiateMultipartUploadRequest.setObjectMetadata(meta); + InitiateMultipartUploadResult initiateMultipartUploadResult = + ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); + List partETags = new ArrayList(); + String uploadId = initiateMultipartUploadResult.getUploadId(); + + try { + for (int i = 0; i < partNum; i++) { + //TODO Optimize this, avoid opening the object multiple times. + FileInputStream fis = new FileInputStream(object); + try { + long skipBytes = realPartSize * i; + AliyunOSSUtils.skipFully(fis, skipBytes); + long size = (realPartSize < dataLen - skipBytes) ? + realPartSize : dataLen - skipBytes; + UploadPartRequest uploadPartRequest = new UploadPartRequest(); + uploadPartRequest.setBucketName(bucketName); + uploadPartRequest.setKey(key); + uploadPartRequest.setUploadId(uploadId); + uploadPartRequest.setInputStream(fis); + uploadPartRequest.setPartSize(size); + uploadPartRequest.setPartNumber(i + 1); + UploadPartResult uploadPartResult = + ossClient.uploadPart(uploadPartRequest); + statistics.incrementWriteOps(1); + partETags.add(uploadPartResult.getPartETag()); + } finally { + fis.close(); + } + } + CompleteMultipartUploadRequest completeMultipartUploadRequest = + new CompleteMultipartUploadRequest(bucketName, key, + uploadId, partETags); + CompleteMultipartUploadResult completeMultipartUploadResult = + ossClient.completeMultipartUpload(completeMultipartUploadRequest); + LOG.debug(completeMultipartUploadResult.getETag()); + } catch (OSSException | ClientException e) { + AbortMultipartUploadRequest abortMultipartUploadRequest = + new AbortMultipartUploadRequest(bucketName, key, uploadId); + ossClient.abortMultipartUpload(abortMultipartUploadRequest); + } + } + + /** + * list objects. + * + * @param prefix prefix. + * @param maxListingLength max no. of entries + * @param marker last key in any previous search. + * @param recursive whether to list directory recursively. + * @return a list of matches. + */ + public ObjectListing listObjects(String prefix, int maxListingLength, + String marker, boolean recursive) { + String delimiter = recursive ? null : "/"; + prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix); + ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); + listRequest.setPrefix(prefix); + listRequest.setDelimiter(delimiter); + listRequest.setMaxKeys(maxListingLength); + listRequest.setMarker(marker); + + ObjectListing listing = ossClient.listObjects(listRequest); + statistics.incrementReadOps(1); + return listing; + } + + /** + * Retrieve a part of an object. + * + * @param key the object name that is being retrieved from the Aliyun OSS. + * @param byteStart start position. + * @param byteEnd end position. + * @return This method returns null if the key is not found. + */ + public InputStream retrieve(String key, long byteStart, long byteEnd) { + try { + GetObjectRequest request = new GetObjectRequest(bucketName, key); + request.setRange(byteStart, byteEnd); + return ossClient.getObject(request).getObjectContent(); + } catch (OSSException | ClientException e) { + return null; + } + } + + /** + * Close OSS client properly. + */ + public void close() { + if (ossClient != null) { + ossClient.shutdown(); + ossClient = null; + } + } + + /** + * Clean up all objects matching the prefix. + * + * @param prefix Aliyun OSS object prefix. + * @throws IOException if failed to clean up objects. + */ + public void purge(String prefix) throws IOException { + String key; + try { + ObjectListing objects = listObjects(prefix, maxKeys, null, true); + for (OSSObjectSummary object : objects.getObjectSummaries()) { + key = object.getKey(); + ossClient.deleteObject(bucketName, key); + } + + for (String dir: objects.getCommonPrefixes()) { + deleteDirs(dir); + } + } catch (OSSException | ClientException e) { + LOG.error("Failed to purge " + prefix); + } + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java new file mode 100644 index 00000000000..3b2bc022e22 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; + +/** + * The input stream for OSS blob system. + * The class uses multi-part downloading to read data from the object content + * stream. + */ +public class AliyunOSSInputStream extends FSInputStream { + public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class); + private final long downloadPartSize; + private AliyunOSSFileSystemStore store; + private final String key; + private Statistics statistics; + private boolean closed; + private InputStream wrappedStream = null; + private long contentLength; + private long position; + private long partRemaining; + + public AliyunOSSInputStream(Configuration conf, + AliyunOSSFileSystemStore store, String key, Long contentLength, + Statistics statistics) throws IOException { + this.store = store; + this.key = key; + this.statistics = statistics; + this.contentLength = contentLength; + downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, + MULTIPART_DOWNLOAD_SIZE_DEFAULT); + reopen(0); + closed = false; + } + + /** + * Reopen the wrapped stream at give position, by seeking for + * data of a part length from object content stream. + * + * @param pos position from start of a file + * @throws IOException if failed to reopen + */ + private synchronized void reopen(long pos) throws IOException { + long partSize; + + if (pos < 0) { + throw new EOFException("Cannot seek at negative position:" + pos); + } else if (pos > contentLength) { + throw new EOFException("Cannot seek after EOF, contentLength:" + + contentLength + " position:" + pos); + } else if (pos + downloadPartSize > contentLength) { + partSize = contentLength - pos; + } else { + partSize = downloadPartSize; + } + + if (wrappedStream != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Aborting old stream to open at pos " + pos); + } + wrappedStream.close(); + } + + wrappedStream = store.retrieve(key, pos, pos + partSize -1); + if (wrappedStream == null) { + throw new IOException("Null IO stream"); + } + position = pos; + partRemaining = partSize; + } + + @Override + public synchronized int read() throws IOException { + checkNotClosed(); + + if (partRemaining <= 0 && position < contentLength) { + reopen(position); + } + + int tries = MAX_RETRIES; + boolean retry; + int byteRead = -1; + do { + retry = false; + try { + byteRead = wrappedStream.read(); + } catch (Exception e) { + handleReadException(e, --tries); + retry = true; + } + } while (retry); + if (byteRead >= 0) { + position++; + partRemaining--; + } + + if (statistics != null && byteRead >= 0) { + statistics.incrementBytesRead(byteRead); + } + return byteRead; + } + + + /** + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + } + } + + @Override + public synchronized int read(byte[] buf, int off, int len) + throws IOException { + checkNotClosed(); + + if (buf == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > buf.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + int bytesRead = 0; + // Not EOF, and read not done + while (position < contentLength && bytesRead < len) { + if (partRemaining == 0) { + reopen(position); + } + + int tries = MAX_RETRIES; + boolean retry; + int bytes = -1; + do { + retry = false; + try { + bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead); + } catch (Exception e) { + handleReadException(e, --tries); + retry = true; + } + } while (retry); + + if (bytes > 0) { + bytesRead += bytes; + position += bytes; + partRemaining -= bytes; + } else if (partRemaining != 0) { + throw new IOException("Failed to read from stream. Remaining:" + + partRemaining); + } + } + + if (statistics != null && bytesRead > 0) { + statistics.incrementBytesRead(bytesRead); + } + + // Read nothing, but attempt to read something + if (bytesRead == 0 && len > 0) { + return -1; + } else { + return bytesRead; + } + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + if (wrappedStream != null) { + wrappedStream.close(); + } + } + + @Override + public synchronized int available() throws IOException { + checkNotClosed(); + + long remaining = contentLength - position; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int)remaining; + } + + @Override + public synchronized void seek(long pos) throws IOException { + checkNotClosed(); + if (position == pos) { + return; + } else if (pos > position && pos < position + partRemaining) { + long len = pos - position; + AliyunOSSUtils.skipFully(wrappedStream, len); + position = pos; + partRemaining -= len; + } else { + reopen(pos); + } + } + + @Override + public synchronized long getPos() throws IOException { + checkNotClosed(); + return position; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + checkNotClosed(); + return false; + } + + private void handleReadException(Exception e, int tries) throws IOException{ + if (tries == 0) { + throw new IOException(e); + } + + LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" + + " connection at position '" + position + "', " + e.getMessage()); + try { + Thread.sleep(100); + } catch (InterruptedException e2) { + LOG.warn(e2.getMessage()); + } + reopen(position); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java new file mode 100644 index 00000000000..c952d0ae858 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.util.Progressable; + +/** + * The output stream for OSS blob system. + * Data will be buffered on local disk, then uploaded to OSS in + * {@link #close()} method. + */ +public class AliyunOSSOutputStream extends OutputStream { + public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class); + private AliyunOSSFileSystemStore store; + private final String key; + private Statistics statistics; + private Progressable progress; + private long partSizeThreshold; + private LocalDirAllocator dirAlloc; + private boolean closed; + private File tmpFile; + private BufferedOutputStream backupStream; + + public AliyunOSSOutputStream(Configuration conf, + AliyunOSSFileSystemStore store, String key, Progressable progress, + Statistics statistics) throws IOException { + this.store = store; + this.key = key; + // The caller cann't get any progress information + this.progress = progress; + this.statistics = statistics; + partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, + MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); + + if (conf.get(BUFFER_DIR_KEY) == null) { + conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss"); + } + dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); + + tmpFile = dirAlloc.createTmpFileForWrite("output-", + LocalDirAllocator.SIZE_UNKNOWN, conf); + backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile)); + closed = false; + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + if (backupStream != null) { + backupStream.close(); + } + long dataLen = tmpFile.length(); + try { + if (dataLen <= partSizeThreshold) { + store.uploadObject(key, tmpFile); + } else { + store.multipartUploadObject(key, tmpFile); + } + } finally { + if (!tmpFile.delete()) { + LOG.warn("Can not delete file: " + tmpFile); + } + } + } + + + + @Override + public synchronized void flush() throws IOException { + backupStream.flush(); + } + + @Override + public synchronized void write(int b) throws IOException { + backupStream.write(b); + statistics.incrementBytesWritten(1); + } + +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java new file mode 100644 index 00000000000..263b4cf8880 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import java.io.IOException; +import java.io.InputStream; + +import com.aliyun.oss.common.auth.CredentialsProvider; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.ProviderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Utility methods for Aliyun OSS code. + */ +public final class AliyunOSSUtils { + private static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSUtils.class); + + private AliyunOSSUtils() { + } + + /** + * Used to get password from configuration. + * + * @param conf configuration that contains password information + * @param key the key of the password + * @return the value for the key + * @throws IOException if failed to get password from configuration + */ + public static String getValueWithKey(Configuration conf, String key) + throws IOException { + try { + final char[] pass = conf.getPassword(key); + if (pass != null) { + return (new String(pass)).trim(); + } else { + return ""; + } + } catch (IOException ioe) { + throw new IOException("Cannot find password option " + key, ioe); + } + } + + /** + * Skip the requested number of bytes or fail if there are no enough bytes + * left. This allows for the possibility that {@link InputStream#skip(long)} + * may not skip as many bytes as requested (most likely because of reaching + * EOF). + * + * @param is the input stream to skip. + * @param n the number of bytes to skip. + * @throws IOException thrown when skipped less number of bytes. + */ + public static void skipFully(InputStream is, long n) throws IOException { + long total = 0; + long cur = 0; + + do { + cur = is.skip(n - total); + total += cur; + } while((total < n) && (cur > 0)); + + if (total < n) { + throw new IOException("Failed to skip " + n + " bytes, possibly due " + + "to EOF."); + } + } + + /** + * Calculate a proper size of multipart piece. If minPartSize + * is too small, the number of multipart pieces may exceed the limit of + * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}. + * + * @param contentLength the size of file. + * @param minPartSize the minimum size of multipart piece. + * @return a revisional size of multipart piece. + */ + public static long calculatePartSize(long contentLength, long minPartSize) { + long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1; + return Math.max(minPartSize, tmpPartSize); + } + + /** + * Create credential provider specified by configuration, or create default + * credential provider if not specified. + * + * @param conf configuration + * @return a credential provider + * @throws IOException on any problem. Class construction issues may be + * nested inside the IOE. + */ + public static CredentialsProvider getCredentialsProvider(Configuration conf) + throws IOException { + CredentialsProvider credentials; + + String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY); + if (StringUtils.isEmpty(className)) { + Configuration newConf = + ProviderUtils.excludeIncompatibleCredentialProviders(conf, + AliyunOSSFileSystem.class); + credentials = new AliyunCredentialsProvider(newConf); + } else { + try { + LOG.debug("Credential provider class is:" + className); + Class credClass = Class.forName(className); + try { + credentials = + (CredentialsProvider)credClass.getDeclaredConstructor( + Configuration.class).newInstance(conf); + } catch (NoSuchMethodException | SecurityException e) { + credentials = + (CredentialsProvider)credClass.getDeclaredConstructor() + .newInstance(); + } + } catch (ClassNotFoundException e) { + throw new IOException(className + " not found.", e); + } catch (NoSuchMethodException | SecurityException e) { + throw new IOException(String.format("%s constructor exception. A " + + "class specified in %s must provide an accessible constructor " + + "accepting URI and Configuration, or an accessible default " + + "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), + e); + } catch (ReflectiveOperationException | IllegalArgumentException e) { + throw new IOException(className + " instantiation exception.", e); + } + } + + return credentials; + } + + /** + * Turns a path (relative or otherwise) into an OSS key, adding a trailing + * "/" if the path is not the root and does not already have a "/" + * at the end. + * + * @param key OSS key or "" + * @return the with a trailing "/", or, if it is the root key, "". + */ + public static String maybeAddTrailingSlash(String key) { + if (StringUtils.isNotEmpty(key) && !key.endsWith("/")) { + return key + '/'; + } else { + return key; + } + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java new file mode 100644 index 00000000000..04a2ccd6c54 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +/** + * ALL configuration constants for OSS filesystem. + */ +public final class Constants { + + private Constants() { + } + + // Class of credential provider + public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY = + "fs.oss.credentials.provider"; + + // OSS access verification + public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId"; + public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret"; + public static final String SECURITY_TOKEN = "fs.oss.securityToken"; + + // Number of simultaneous connections to oss + public static final String MAXIMUM_CONNECTIONS_KEY = + "fs.oss.connection.maximum"; + public static final int MAXIMUM_CONNECTIONS_DEFAULT = 32; + + // Connect to oss over ssl + public static final String SECURE_CONNECTIONS_KEY = + "fs.oss.connection.secure.enabled"; + public static final boolean SECURE_CONNECTIONS_DEFAULT = true; + + // Use a custom endpoint + public static final String ENDPOINT_KEY = "fs.oss.endpoint"; + + // Connect to oss through a proxy server + public static final String PROXY_HOST_KEY = "fs.oss.proxy.host"; + public static final String PROXY_PORT_KEY = "fs.oss.proxy.port"; + public static final String PROXY_USERNAME_KEY = "fs.oss.proxy.username"; + public static final String PROXY_PASSWORD_KEY = "fs.oss.proxy.password"; + public static final String PROXY_DOMAIN_KEY = "fs.oss.proxy.domain"; + public static final String PROXY_WORKSTATION_KEY = + "fs.oss.proxy.workstation"; + + // Number of times we should retry errors + public static final String MAX_ERROR_RETRIES_KEY = "fs.oss.attempts.maximum"; + public static final int MAX_ERROR_RETRIES_DEFAULT = 20; + + // Time until we give up trying to establish a connection to oss + public static final String ESTABLISH_TIMEOUT_KEY = + "fs.oss.connection.establish.timeout"; + public static final int ESTABLISH_TIMEOUT_DEFAULT = 50000; + + // Time until we give up on a connection to oss + public static final String SOCKET_TIMEOUT_KEY = "fs.oss.connection.timeout"; + public static final int SOCKET_TIMEOUT_DEFAULT = 200000; + + // Number of records to get while paging through a directory listing + public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum"; + public static final int MAX_PAGING_KEYS_DEFAULT = 1000; + + // Size of each of or multipart pieces in bytes + public static final String MULTIPART_UPLOAD_SIZE_KEY = + "fs.oss.multipart.upload.size"; + + public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024; + public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000; + + // Minimum size in bytes before we start a multipart uploads or copy + public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY = + "fs.oss.multipart.upload.threshold"; + public static final long MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT = + 20 * 1024 * 1024; + + public static final String MULTIPART_DOWNLOAD_SIZE_KEY = + "fs.oss.multipart.download.size"; + + public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024; + + // Comma separated list of directories + public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir"; + + // private | public-read | public-read-write + public static final String CANNED_ACL_KEY = "fs.oss.acl.default"; + public static final String CANNED_ACL_DEFAULT = ""; + + // OSS server-side encryption + public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY = + "fs.oss.server-side-encryption-algorithm"; + + public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size"; + public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024; + public static final String FS_OSS = "oss"; + + public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L; + public static final int MAX_RETRIES = 10; + +} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java new file mode 100644 index 00000000000..234567b2b00 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Aliyun OSS Filesystem. + */ +package org.apache.hadoop.fs.aliyun.oss; \ No newline at end of file diff --git a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md new file mode 100644 index 00000000000..62e650506ff --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md @@ -0,0 +1,294 @@ + + +# Hadoop-Aliyun module: Integration with Aliyun Web Services + + + +## Overview + +The `hadoop-aliyun` module provides support for Aliyun integration with +[Aliyun Object Storage Service (Aliyun OSS)](https://www.aliyun.com/product/oss). +The generated JAR file, `hadoop-aliyun.jar` also declares a transitive +dependency on all external artifacts which are needed for this support — enabling +downstream applications to easily use this support. + +To make it part of Apache Hadoop's default classpath, simply make sure +that HADOOP_OPTIONAL_TOOLS in hadoop-env.sh has 'hadoop-aliyun' in the list. + +### Features + +* Read and write data stored in Aliyun OSS. +* Present a hierarchical file system view by implementing the standard Hadoop +[`FileSystem`](../api/org/apache/hadoop/fs/FileSystem.html) interface. +* Can act as a source of data in a MapReduce job, or a sink. + +### Warning #1: Object Stores are not filesystems. + +Aliyun OSS is an example of "an object store". In order to achieve scalability +and especially high availability, Aliyun OSS has relaxed some of the constraints +which classic "POSIX" filesystems promise. + + + +Specifically + +1. Atomic operations: `delete()` and `rename()` are implemented by recursive +file-by-file operations. They take time at least proportional to the number of files, +during which time partial updates may be visible. `delete()` and `rename()` +can not guarantee atomicity. If the operations are interrupted, the filesystem +is left in an intermediate state. +2. File owner and group are persisted, but the permissions model is not enforced. +Authorization occurs at the level of the entire Aliyun account via +[Aliyun Resource Access Management (Aliyun RAM)](https://www.aliyun.com/product/ram). +3. Directory last access time is not tracked. +4. The append operation is not supported. + +### Warning #2: Directory last access time is not tracked, +features of Hadoop relying on this can have unexpected behaviour. E.g. the +AggregatedLogDeletionService of YARN will not remove the appropriate logfiles. + +### Warning #3: Your Aliyun credentials are valuable + +Your Aliyun credentials not only pay for services, they offer read and write +access to the data. Anyone with the account can not only read your datasets +—they can delete them. + +Do not inadvertently share these credentials through means such as +1. Checking in to SCM any configuration files containing the secrets. +2. Logging them to a console, as they invariably end up being seen. +3. Defining filesystem URIs with the credentials in the URL, such as +`oss://accessKeyId:accessKeySecret@directory/file`. They will end up in +logs and error messages. +4. Including the secrets in bug reports. + +If you do any of these: change your credentials immediately! + +### Warning #4: The Aliyun OSS client provided by Aliyun E-MapReduce are different from this implementation + +Specifically: on Aliyun E-MapReduce, `oss://` is also supported but with +a different implementation. If you are using Aliyun E-MapReduce, +follow these instructions —and be aware that all issues related to Aliyun +OSS integration in E-MapReduce can only be addressed by Aliyun themselves: +please raise your issues with them. + +## OSS + +### Authentication properties + + + fs.oss.accessKeyId + Aliyun access key ID + + + + fs.oss.accessKeySecret + Aliyun access key secret + + + + fs.oss.credentials.provider + + Class name of a credentials provider that implements + com.aliyun.oss.common.auth.CredentialsProvider. Omit if using access/secret keys + or another authentication mechanism. The specified class must provide an + accessible constructor accepting java.net.URI and + org.apache.hadoop.conf.Configuration, or an accessible default constructor. + + + +### Other properties + + + fs.oss.endpoint + Aliyun OSS endpoint to connect to. An up-to-date list is + provided in the Aliyun OSS Documentation. + + + + + fs.oss.proxy.host + Hostname of the (optinal) proxy server for Aliyun OSS connection + + + + fs.oss.proxy.port + Proxy server port + + + + fs.oss.proxy.username + Username for authenticating with proxy server + + + + fs.oss.proxy.password + Password for authenticating with proxy server. + + + + fs.oss.proxy.domain + Domain for authenticating with proxy server. + + + + fs.oss.proxy.workstation + Workstation for authenticating with proxy server. + + + + fs.oss.attempts.maximum + 20 + How many times we should retry commands on transient errors. + + + + fs.oss.connection.establish.timeout + 50000 + Connection setup timeout in milliseconds. + + + + fs.oss.connection.timeout + 200000 + Socket connection timeout in milliseconds. + + + + fs.oss.paging.maximum + 1000 + How many keys to request from Aliyun OSS when doing directory listings at a time. + + + + + fs.oss.multipart.upload.size + 10485760 + Size of each of multipart pieces in bytes. + + + + fs.oss.multipart.upload.threshold + 20971520 + Minimum size in bytes before we start a multipart uploads or copy. + + + + fs.oss.multipart.download.size + 102400/value> + Size in bytes in each request from ALiyun OSS. + + + + fs.oss.buffer.dir + Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS + + + + fs.oss.acl.default + + Set a canned ACL for bucket. Value may be private, public-read, public-read-write. + + + + + fs.oss.server-side-encryption-algorithm + + Specify a server-side encryption algorithm for oss: file system. + Unset by default, and the only other currently allowable value is AES256. + + + + + fs.oss.connection.maximum + 32 + Number of simultaneous connections to oss. + + + + fs.oss.connection.secure.enabled + true + Connect to oss over ssl or not, true by default. + + +## Testing the hadoop-aliyun Module + +To test `oss://` filesystem client, two files which pass in authentication +details to the test runner are needed. + +1. `auth-keys.xml` +2. `core-site.xml` + +Those two configuration files must be put into +`hadoop-tools/hadoop-aliyun/src/test/resources`. + +### `core-site.xml` + +This file pre-exists and sources the configurations created in `auth-keys.xml`. + +For most cases, no modification is needed, unless a specific, non-default property +needs to be set during the testing. + +### `auth-keys.xml` + +This file triggers the testing of Aliyun OSS module. Without this file, +*none of the tests in this module will be executed* + +It contains the access key Id/secret and proxy information that are needed to +connect to Aliyun OSS, and an OSS bucket URL should be also provided. + +1. `test.fs.oss.name` : the URL of the bucket for Aliyun OSS tests + +The contents of the bucket will be cleaned during the testing process, so +do not use the bucket for any purpose other than testing. + +### Run Hadoop contract tests +Create file `contract-test-options.xml` under `/test/resources`. If a +specific file `fs.contract.test.fs.oss` test path is not defined, those +tests will be skipped. Credentials are also needed to run any of those +tests, they can be copied from `auth-keys.xml` or through direct +XInclude inclusion. Here is an example of `contract-test-options.xml`: + + + + + + + + + fs.contract.test.fs.oss + oss://spark-tests + + + + fs.oss.impl + org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem + + + + fs.oss.endpoint + oss-cn-hangzhou.aliyuncs.com + + + + fs.oss.buffer.dir + /tmp/oss + + + + fs.oss.multipart.download.size + 102400 + + diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSTestUtils.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSTestUtils.java new file mode 100644 index 00000000000..901cb2bd082 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSTestUtils.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.internal.AssumptionViolatedException; + +import java.io.IOException; +import java.net.URI; + +/** + * Utility class for Aliyun OSS Tests. + */ +public final class AliyunOSSTestUtils { + + private AliyunOSSTestUtils() { + } + + /** + * Create the test filesystem. + * + * If the test.fs.oss.name property is not set, + * tests will fail. + * + * @param conf configuration + * @return the FS + * @throws IOException + */ + public static AliyunOSSFileSystem createTestFileSystem(Configuration conf) + throws IOException { + String fsname = conf.getTrimmed( + TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME, ""); + + boolean liveTest = StringUtils.isNotEmpty(fsname); + URI testURI = null; + if (liveTest) { + testURI = URI.create(fsname); + liveTest = testURI.getScheme().equals(Constants.FS_OSS); + } + + if (!liveTest) { + throw new AssumptionViolatedException("No test filesystem in " + + TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME); + } + AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem(); + ossfs.initialize(testURI, conf); + return ossfs; + } + + /** + * Generate unique test path for multiple user tests. + * + * @return root test path + */ + public static String generateUniqueTestPath() { + String testUniqueForkId = System.getProperty("test.unique.fork.id"); + return testUniqueForkId == null ? "/test" : + "/" + testUniqueForkId + "/test"; + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java new file mode 100644 index 00000000000..e08a4dccfca --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import com.aliyun.oss.common.auth.Credentials; +import com.aliyun.oss.common.auth.InvalidCredentialsException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID; +import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET; +import static org.apache.hadoop.fs.aliyun.oss.Constants.SECURITY_TOKEN; + +/** + * Tests use of temporary credentials (for example, Aliyun STS & Aliyun OSS). + * This test extends a class that "does things to the root directory", and + * should only be used against transient filesystems where you don't care about + * the data. + */ +public class TestAliyunCredentials extends AbstractFSContractTestBase { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new AliyunOSSContract(conf); + } + + @Test + public void testCredentialMissingAccessKeyId() throws Throwable { + Configuration conf = new Configuration(); + conf.set(ACCESS_KEY_ID, ""); + conf.set(ACCESS_KEY_SECRET, "accessKeySecret"); + conf.set(SECURITY_TOKEN, "token"); + validateCredential(conf); + } + + @Test + public void testCredentialMissingAccessKeySecret() throws Throwable { + Configuration conf = new Configuration(); + conf.set(ACCESS_KEY_ID, "accessKeyId"); + conf.set(ACCESS_KEY_SECRET, ""); + conf.set(SECURITY_TOKEN, "token"); + validateCredential(conf); + } + + private void validateCredential(Configuration conf) { + try { + AliyunCredentialsProvider provider + = new AliyunCredentialsProvider(conf); + Credentials credentials = provider.getCredentials(); + fail("Expected a CredentialInitializationException, got " + credentials); + } catch (InvalidCredentialsException expected) { + // expected + } catch (IOException e) { + fail("Unexpected exception."); + } + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java new file mode 100644 index 00000000000..19120a62e22 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystemContractBaseTest; +import org.apache.hadoop.fs.Path; + +import org.junit.Before; +import org.junit.Test; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeNotNull; +import static org.junit.Assume.assumeTrue; + +/** + * Tests a live Aliyun OSS system. + */ +public class TestAliyunOSSFileSystemContract + extends FileSystemContractBaseTest { + public static final String TEST_FS_OSS_NAME = "test.fs.oss.name"; + private static Path testRootPath = + new Path(AliyunOSSTestUtils.generateUniqueTestPath()); + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + fs = AliyunOSSTestUtils.createTestFileSystem(conf); + assumeNotNull(fs); + } + + public Path getTestBaseDir() { + return testRootPath; + } + + @Test + public void testMkdirsWithUmask() throws Exception { + // not supported + } + + @Test + public void testRootDirAlwaysExists() throws Exception { + //this will throw an exception if the path is not found + fs.getFileStatus(super.path("/")); + //this catches overrides of the base exists() method that don't + //use getFileStatus() as an existence probe + assertTrue("FileSystem.exists() fails for root", + fs.exists(super.path("/"))); + } + + @Test + public void testRenameRootDirForbidden() throws Exception { + assumeTrue(renameSupported()); + rename(super.path("/"), + super.path("/test/newRootDir"), + false, true, false); + } + + @Test + public void testDeleteSubdir() throws IOException { + Path parentDir = this.path("/test/hadoop"); + Path file = this.path("/test/hadoop/file"); + Path subdir = this.path("/test/hadoop/subdir"); + this.createFile(file); + + assertTrue("Created subdir", this.fs.mkdirs(subdir)); + assertTrue("File exists", this.fs.exists(file)); + assertTrue("Parent dir exists", this.fs.exists(parentDir)); + assertTrue("Subdir exists", this.fs.exists(subdir)); + + assertTrue("Deleted subdir", this.fs.delete(subdir, true)); + assertTrue("Parent should exist", this.fs.exists(parentDir)); + + assertTrue("Deleted file", this.fs.delete(file, false)); + assertTrue("Parent should exist", this.fs.exists(parentDir)); + } + + + @Override + protected boolean renameSupported() { + return true; + } + + @Test + public void testRenameNonExistentPath() throws Exception { + assumeTrue(renameSupported()); + Path src = this.path("/test/hadoop/path"); + Path dst = this.path("/test/new/newpath"); + try { + super.rename(src, dst, false, false, false); + fail("Should throw FileNotFoundException!"); + } catch (FileNotFoundException e) { + // expected + } + } + + @Test + public void testRenameFileMoveToNonExistentDirectory() throws Exception { + assumeTrue(renameSupported()); + Path src = this.path("/test/hadoop/file"); + this.createFile(src); + Path dst = this.path("/test/new/newfile"); + try { + super.rename(src, dst, false, true, false); + fail("Should throw FileNotFoundException!"); + } catch (FileNotFoundException e) { + // expected + } + } + + @Test + public void testRenameDirectoryMoveToNonExistentDirectory() throws Exception { + assumeTrue(renameSupported()); + Path src = this.path("/test/hadoop/dir"); + this.fs.mkdirs(src); + Path dst = this.path("/test/new/newdir"); + try { + super.rename(src, dst, false, true, false); + fail("Should throw FileNotFoundException!"); + } catch (FileNotFoundException e) { + // expected + } + } + + @Test + public void testRenameFileMoveToExistingDirectory() throws Exception { + super.testRenameFileMoveToExistingDirectory(); + } + + @Test + public void testRenameFileAsExistingFile() throws Exception { + assumeTrue(renameSupported()); + Path src = this.path("/test/hadoop/file"); + this.createFile(src); + Path dst = this.path("/test/new/newfile"); + this.createFile(dst); + try { + super.rename(src, dst, false, true, true); + fail("Should throw FileAlreadyExistsException"); + } catch (FileAlreadyExistsException e) { + // expected + } + } + + @Test + public void testRenameDirectoryAsExistingFile() throws Exception { + assumeTrue(renameSupported()); + Path src = this.path("/test/hadoop/dir"); + this.fs.mkdirs(src); + Path dst = this.path("/test/new/newfile"); + this.createFile(dst); + try { + super.rename(src, dst, false, true, true); + fail("Should throw FileAlreadyExistsException"); + } catch (FileAlreadyExistsException e) { + // expected + } + } + + @Test + public void testGetFileStatusFileAndDirectory() throws Exception { + Path filePath = this.path("/test/oss/file1"); + this.createFile(filePath); + assertTrue("Should be file", this.fs.getFileStatus(filePath).isFile()); + assertFalse("Should not be directory", + this.fs.getFileStatus(filePath).isDirectory()); + + Path dirPath = this.path("/test/oss/dir"); + this.fs.mkdirs(dirPath); + assertTrue("Should be directory", + this.fs.getFileStatus(dirPath).isDirectory()); + assertFalse("Should not be file", this.fs.getFileStatus(dirPath).isFile()); + + Path parentPath = this.path("/test/oss"); + for (FileStatus fileStatus: fs.listStatus(parentPath)) { + assertTrue("file and directory should be new", + fileStatus.getModificationTime() > 0L); + } + } + + @Test + public void testMkdirsForExistingFile() throws Exception { + Path testFile = this.path("/test/hadoop/file"); + assertFalse(this.fs.exists(testFile)); + this.createFile(testFile); + assertTrue(this.fs.exists(testFile)); + try { + this.fs.mkdirs(testFile); + fail("Should throw FileAlreadyExistsException!"); + } catch (FileAlreadyExistsException e) { + // expected + } + } + +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java new file mode 100644 index 00000000000..7f4bac25642 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.security.DigestInputStream; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNotNull; + +/** + * Test the bridging logic between Hadoop's abstract filesystem and + * Aliyun OSS. + */ +public class TestAliyunOSSFileSystemStore { + private Configuration conf; + private AliyunOSSFileSystemStore store; + private AliyunOSSFileSystem fs; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + fs = new AliyunOSSFileSystem(); + fs.initialize(URI.create(conf.get("test.fs.oss.name")), conf); + store = fs.getStore(); + } + + @After + public void tearDown() throws Exception { + try { + store.purge("test"); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + @BeforeClass + public static void checkSettings() throws Exception { + Configuration conf = new Configuration(); + assumeNotNull(conf.get(Constants.ACCESS_KEY_ID)); + assumeNotNull(conf.get(Constants.ACCESS_KEY_SECRET)); + assumeNotNull(conf.get("test.fs.oss.name")); + } + + protected void writeRenameReadCompare(Path path, long len) + throws IOException, NoSuchAlgorithmException { + // If len > fs.oss.multipart.upload.threshold, + // we'll use a multipart upload copy + MessageDigest digest = MessageDigest.getInstance("MD5"); + OutputStream out = new BufferedOutputStream( + new DigestOutputStream(fs.create(path, false), digest)); + for (long i = 0; i < len; i++) { + out.write('Q'); + } + out.flush(); + out.close(); + + assertTrue("Exists", fs.exists(path)); + + Path copyPath = path.suffix(".copy"); + fs.rename(path, copyPath); + + assertTrue("Copy exists", fs.exists(copyPath)); + + // Download file from Aliyun OSS and compare the digest against the original + MessageDigest digest2 = MessageDigest.getInstance("MD5"); + InputStream in = new BufferedInputStream( + new DigestInputStream(fs.open(copyPath), digest2)); + long copyLen = 0; + while (in.read() != -1) { + copyLen++; + } + in.close(); + + assertEquals("Copy length matches original", len, copyLen); + assertArrayEquals("Digests match", digest.digest(), digest2.digest()); + } + + @Test + public void testSmallUpload() throws IOException, NoSuchAlgorithmException { + // Regular upload, regular copy + writeRenameReadCompare(new Path("/test/small"), 16384); + } + + @Test + public void testLargeUpload() + throws IOException, NoSuchAlgorithmException { + // Multipart upload, multipart copy + writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java new file mode 100644 index 00000000000..d798cafb206 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.fs.FileStatus; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; + +import static org.junit.Assert.assertTrue; + +/** + * Tests basic functionality for AliyunOSSInputStream, including seeking and + * reading files. + */ +public class TestAliyunOSSInputStream { + + private FileSystem fs; + + private static final Logger LOG = + LoggerFactory.getLogger(TestAliyunOSSInputStream.class); + + private static String testRootPath = + AliyunOSSTestUtils.generateUniqueTestPath(); + + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + fs = AliyunOSSTestUtils.createTestFileSystem(conf); + } + + @After + public void tearDown() throws Exception { + if (fs != null) { + fs.delete(new Path(testRootPath), true); + } + } + + private Path setPath(String path) { + if (path.startsWith("/")) { + return new Path(testRootPath + path); + } else { + return new Path(testRootPath + "/" + path); + } + } + + @Test + public void testSeekFile() throws Exception { + Path smallSeekFile = setPath("/test/smallSeekFile.txt"); + long size = 5 * 1024 * 1024; + + ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255); + LOG.info("5MB file created: smallSeekFile.txt"); + + FSDataInputStream instream = this.fs.open(smallSeekFile); + int seekTimes = 5; + LOG.info("multiple fold position seeking test...:"); + for (int i = 0; i < seekTimes; i++) { + long pos = size / (seekTimes - i) - 1; + LOG.info("begin seeking for pos: " + pos); + instream.seek(pos); + assertTrue("expected position at:" + pos + ", but got:" + + instream.getPos(), instream.getPos() == pos); + LOG.info("completed seeking at pos: " + instream.getPos()); + } + LOG.info("random position seeking test...:"); + Random rand = new Random(); + for (int i = 0; i < seekTimes; i++) { + long pos = Math.abs(rand.nextLong()) % size; + LOG.info("begin seeking for pos: " + pos); + instream.seek(pos); + assertTrue("expected position at:" + pos + ", but got:" + + instream.getPos(), instream.getPos() == pos); + LOG.info("completed seeking at pos: " + instream.getPos()); + } + IOUtils.closeStream(instream); + } + + @Test + public void testReadFile() throws Exception { + final int bufLen = 256; + final int sizeFlag = 5; + String filename = "readTestFile_" + sizeFlag + ".txt"; + Path readTestFile = setPath("/test/" + filename); + long size = sizeFlag * 1024 * 1024; + + ContractTestUtils.generateTestFile(this.fs, readTestFile, size, 256, 255); + LOG.info(sizeFlag + "MB file created: /test/" + filename); + + FSDataInputStream instream = this.fs.open(readTestFile); + byte[] buf = new byte[bufLen]; + long bytesRead = 0; + while (bytesRead < size) { + int bytes; + if (size - bytesRead < bufLen) { + int remaining = (int)(size - bytesRead); + bytes = instream.read(buf, 0, remaining); + } else { + bytes = instream.read(buf, 0, bufLen); + } + bytesRead += bytes; + + if (bytesRead % (1024 * 1024) == 0) { + int available = instream.available(); + int remaining = (int)(size - bytesRead); + assertTrue("expected remaining:" + remaining + ", but got:" + available, + remaining == available); + LOG.info("Bytes read: " + Math.round((double)bytesRead / (1024 * 1024)) + + " MB"); + } + } + assertTrue(instream.available() == 0); + IOUtils.closeStream(instream); + } + + @Test + public void testDirectoryModifiedTime() throws Exception { + Path emptyDirPath = setPath("/test/emptyDirectory"); + fs.mkdirs(emptyDirPath); + FileStatus dirFileStatus = fs.getFileStatus(emptyDirPath); + assertTrue("expected the empty dir is new", + dirFileStatus.getModificationTime() > 0L); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java new file mode 100644 index 00000000000..6b87d9ca466 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.io.IOException; + +/** + * Tests regular and multi-part upload functionality for AliyunOSSOutputStream. + */ +public class TestAliyunOSSOutputStream { + private FileSystem fs; + private static String testRootPath = + AliyunOSSTestUtils.generateUniqueTestPath(); + + @Rule + public Timeout testTimeout = new Timeout(30 * 60 * 1000); + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024); + conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024); + fs = AliyunOSSTestUtils.createTestFileSystem(conf); + } + + @After + public void tearDown() throws Exception { + if (fs != null) { + fs.delete(new Path(testRootPath), true); + } + } + + protected Path getTestPath() { + return new Path(testRootPath + "/test-aliyun-oss"); + } + + @Test + public void testRegularUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024); + } + + @Test + public void testMultiPartUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024); + } + + @Test + public void testMultiPartUploadLimit() throws IOException { + long partSize1 = AliyunOSSUtils.calculatePartSize(10 * 1024, 100 * 1024); + assert(10 * 1024 / partSize1 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); + + long partSize2 = AliyunOSSUtils.calculatePartSize(200 * 1024, 100 * 1024); + assert(200 * 1024 / partSize2 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); + + long partSize3 = AliyunOSSUtils.calculatePartSize(10000 * 100 * 1024, + 100 * 1024); + assert(10000 * 100 * 1024 / partSize3 + < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); + + long partSize4 = AliyunOSSUtils.calculatePartSize(10001 * 100 * 1024, + 100 * 1024); + assert(10001 * 100 * 1024 / partSize4 + < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/AliyunOSSContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/AliyunOSSContract.java new file mode 100644 index 00000000000..624c606c6b8 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/AliyunOSSContract.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractBondedFSContract; + +/** + * The contract of Aliyun OSS: only enabled if the test bucket is provided. + */ +public class AliyunOSSContract extends AbstractBondedFSContract { + + public static final String CONTRACT_XML = "contract/aliyun-oss.xml"; + + public AliyunOSSContract(Configuration conf) { + super(conf); + //insert the base features + addConfResource(CONTRACT_XML); + } + + @Override + public String getScheme() { + return "oss"; + } + + @Override + public Path getTestPath() { + String testUniqueForkId = System.getProperty("test.unique.fork.id"); + return testUniqueForkId == null ? super.getTestPath() : + new Path("/" + testUniqueForkId, "test"); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractCreate.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractCreate.java new file mode 100644 index 00000000000..88dd8cd2267 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractCreate.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractCreateTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Aliyun OSS contract creating tests. + */ +public class TestAliyunOSSContractCreate extends AbstractContractCreateTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new AliyunOSSContract(conf); + } + +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDelete.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDelete.java new file mode 100644 index 00000000000..1658d806831 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDelete.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Aliyun OSS contract deleting tests. + */ +public class TestAliyunOSSContractDelete extends AbstractContractDeleteTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new AliyunOSSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java new file mode 100644 index 00000000000..18d09d54149 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; + +import static org.apache.hadoop.fs.aliyun.oss.Constants.*; + +/** + * Contract test suite covering Aliyun OSS integration with DistCp. + */ +public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest { + + private static final long MULTIPART_SETTING = 8 * 1024 * 1024; // 8 MB + + @Override + protected Configuration createConfiguration() { + Configuration newConf = super.createConfiguration(); + newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING); + newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING); + return newConf; + } + + @Override + protected AliyunOSSContract createContract(Configuration conf) { + return new AliyunOSSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractGetFileStatus.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractGetFileStatus.java new file mode 100644 index 00000000000..c69124d9243 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractGetFileStatus.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Test getFileStatus and related listing operations. + */ +public class TestAliyunOSSContractGetFileStatus + extends AbstractContractGetFileStatusTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new AliyunOSSContract(conf); + } + +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractMkdir.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractMkdir.java new file mode 100644 index 00000000000..6cb754975ca --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractMkdir.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Aliyun OSS contract directory tests. + */ +public class TestAliyunOSSContractMkdir extends AbstractContractMkdirTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new AliyunOSSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractOpen.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractOpen.java new file mode 100644 index 00000000000..099aba6296f --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractOpen.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractOpenTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Aliyun OSS contract opening file tests. + */ +public class TestAliyunOSSContractOpen extends AbstractContractOpenTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new AliyunOSSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRename.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRename.java new file mode 100644 index 00000000000..e15b3ba30de --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRename.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRenameTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Aliyun OSS contract renaming tests. + */ +public class TestAliyunOSSContractRename extends AbstractContractRenameTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new AliyunOSSContract(conf); + } + +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRootDir.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRootDir.java new file mode 100644 index 00000000000..9faae374521 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRootDir.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; + +/** + * Root dir operations against an Aliyun OSS bucket. + */ +public class TestAliyunOSSContractRootDir extends + AbstractContractRootDirectoryTest { + + private static final Logger LOG = + LoggerFactory.getLogger(TestAliyunOSSContractRootDir.class); + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new AliyunOSSContract(conf); + } + + @Override + public void testListEmptyRootDirectory() throws IOException { + for (int attempt = 1, maxAttempts = 10; attempt <= maxAttempts; ++attempt) { + try { + super.testListEmptyRootDirectory(); + break; + } catch (AssertionError | FileNotFoundException e) { + if (attempt < maxAttempts) { + LOG.info("Attempt {} of {} for empty root directory test failed. " + + "Attempting retry.", attempt, maxAttempts); + try { + Thread.sleep(1000); + } catch (InterruptedException e2) { + Thread.currentThread().interrupt(); + fail("Test interrupted."); + break; + } + } else { + LOG.error( + "Empty root directory test failed {} attempts. Failing test.", + maxAttempts); + throw e; + } + } + } + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractSeek.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractSeek.java new file mode 100644 index 00000000000..d9b367440a2 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractSeek.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractContractSeekTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.Test; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; + +/** + * Aliyun OSS contract seeking tests. + */ +public class TestAliyunOSSContractSeek extends AbstractContractSeekTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new AliyunOSSContract(conf); + } + + @Test + public void testSeekBeyondDownloadSize() throws Throwable { + describe("seek and read beyond download size."); + + Path byteFile = path("byte_file.txt"); + // 'fs.oss.multipart.download.size' = 100 * 1024 + byte[] block = dataset(100 * 1024 + 10, 0, 255); + FileSystem fs = getFileSystem(); + createFile(fs, byteFile, true, block); + + FSDataInputStream instream = getFileSystem().open(byteFile); + instream.seek(100 * 1024 - 1); + assertEquals(100 * 1024 - 1, instream.getPos()); + assertEquals(144, instream.read()); + instream.seek(100 * 1024 + 1); + assertEquals(100 * 1024 + 1, instream.getPos()); + assertEquals(146, instream.read()); + } +} diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml b/hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml new file mode 100644 index 00000000000..9ec4be6c8af --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml @@ -0,0 +1,120 @@ + + + + + + fs.contract.test.random-seek-count + 10 + + + + fs.contract.is-blobstore + true + + + + fs.contract.is-case-sensitive + true + + + + fs.contract.rename-returns-false-if-source-missing + false + + + + fs.contract.rename-remove-dest-if-empty-dir + false + + + + fs.contract.supports-append + false + + + + fs.contract.supports-atomic-directory-delete + false + + + + fs.contract.supports-atomic-rename + false + + + + fs.contract.supports-block-locality + false + + + + fs.contract.supports-concat + false + + + + fs.contract.supports-seek + true + + + + fs.contract.supports-seek-on-closed-file + true + + + + fs.contract.rejects-seek-past-eof + true + + + + fs.contract.supports-strict-exceptions + true + + + + fs.contract.supports-unix-permissions + false + + + + fs.contract.rename-overwrites-dest + true + + + + fs.contract.test.root-tests-enabled + true + + + + fs.contract.supports-getfilestatus + true + + + + fs.oss.multipart.download.size + 102400 + + + + fs.contract.create-visibility-delayed + true + + diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml b/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml new file mode 100644 index 00000000000..fa4118c2162 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml @@ -0,0 +1,46 @@ + + + + + + + hadoop.tmp.dir + target/build/test + A base for other temporary directories. + true + + + + + hadoop.security.authentication + simple + + + + + + + + diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties new file mode 100644 index 00000000000..bb5cbe5ec32 --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# log4j configuration used during build and unit tests + +log4j.rootLogger=INFO,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml index 3ecc51b1421..3af7aee392f 100644 --- a/hadoop-tools/hadoop-tools-dist/pom.xml +++ b/hadoop-tools/hadoop-tools-dist/pom.xml @@ -100,6 +100,12 @@ compile ${project.version} + + org.apache.hadoop + hadoop-aliyun + compile + ${project.version} + org.apache.hadoop hadoop-sls diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml index e307a400926..117d12c0914 100644 --- a/hadoop-tools/pom.xml +++ b/hadoop-tools/pom.xml @@ -48,6 +48,7 @@ hadoop-aws hadoop-azure hadoop-azure-datalake + hadoop-aliyun