diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 4bb640fd37d..4eac7d5314e 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -483,11 +483,7 @@ hadoop-aws ${project.version} - - org.apache.hadoop - hadoop-aliyun - ${project.version} - + org.apache.hadoop hadoop-kms @@ -1082,22 +1078,6 @@ 2.9.1 - - com.aliyun.oss - aliyun-sdk-oss - 2.8.1 - - - org.apache.httpcomponents - httpclient - - - commons-beanutils - commons-beanutils - - - - org.apache.curator curator-recipes diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml deleted file mode 100644 index 40d78d0cd6c..00000000000 --- a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - diff --git a/hadoop-tools/hadoop-aliyun/pom.xml b/hadoop-tools/hadoop-aliyun/pom.xml deleted file mode 100644 index 357786b5b3d..00000000000 --- a/hadoop-tools/hadoop-aliyun/pom.xml +++ /dev/null @@ -1,147 +0,0 @@ - - - - 4.0.0 - - org.apache.hadoop - hadoop-project - 2.9.1-SNAPSHOT - ../../hadoop-project - - hadoop-aliyun - Apache Hadoop Aliyun OSS support - jar - - - UTF-8 - true - - - - - tests-off - - - src/test/resources/auth-keys.xml - - - - true - - - - tests-on - - - src/test/resources/auth-keys.xml - - - - false - - - - - - - - org.codehaus.mojo - findbugs-maven-plugin - - true - true - ${basedir}/dev-support/findbugs-exclude.xml - - Max - - - - org.apache.maven.plugins - maven-surefire-plugin - - 3600 - - - - org.apache.maven.plugins - maven-dependency-plugin - - - deplist - compile - - list - - - - ${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt - - - - - - - - - - junit - junit - test - - - - com.aliyun.oss - aliyun-sdk-oss - compile - - - - org.apache.hadoop - hadoop-common - compile - - - - org.apache.hadoop - hadoop-common - test - test-jar - - - org.apache.hadoop - hadoop-distcp - test - - - org.apache.hadoop - hadoop-distcp - ${project.version} - test - test-jar - - - org.apache.hadoop - hadoop-yarn-server-tests - test - test-jar - - - org.apache.hadoop - hadoop-mapreduce-client-jobclient - test - - - diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java deleted file mode 100644 index b46c67aa5e7..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import com.aliyun.oss.common.auth.Credentials; -import com.aliyun.oss.common.auth.CredentialsProvider; -import com.aliyun.oss.common.auth.DefaultCredentials; -import com.aliyun.oss.common.auth.InvalidCredentialsException; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * Support session credentials for authenticating with Aliyun. - */ -public class AliyunCredentialsProvider implements CredentialsProvider { - private Credentials credentials = null; - - public AliyunCredentialsProvider(Configuration conf) - throws IOException { - String accessKeyId; - String accessKeySecret; - String securityToken; - try { - accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID); - accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET); - } catch (IOException e) { - throw new InvalidCredentialsException(e); - } - - try { - securityToken = AliyunOSSUtils.getValueWithKey(conf, SECURITY_TOKEN); - } catch (IOException e) { - securityToken = null; - } - - if (StringUtils.isEmpty(accessKeyId) - || StringUtils.isEmpty(accessKeySecret)) { - throw new InvalidCredentialsException( - "AccessKeyId and AccessKeySecret should not be null or empty."); - } - - if (StringUtils.isNotEmpty(securityToken)) { - credentials = new DefaultCredentials(accessKeyId, accessKeySecret, - securityToken); - } else { - credentials = new DefaultCredentials(accessKeyId, accessKeySecret); - } - } - - @Override - public void setCredentials(Credentials creds) { - if (creds == null) { - throw new InvalidCredentialsException("Credentials should not be null."); - } - - credentials = creds; - } - - @Override - public Credentials getCredentials() { - if (credentials == null) { - throw new InvalidCredentialsException("Invalid credentials"); - } - - return credentials; - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java deleted file mode 100644 index 3561b0241ea..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ /dev/null @@ -1,608 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathIOException; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Progressable; - -import com.aliyun.oss.model.OSSObjectSummary; -import com.aliyun.oss.model.ObjectListing; -import com.aliyun.oss.model.ObjectMetadata; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * Implementation of {@link FileSystem} for - * Aliyun OSS, used to access OSS blob system in a filesystem style. - */ -public class AliyunOSSFileSystem extends FileSystem { - private static final Logger LOG = - LoggerFactory.getLogger(AliyunOSSFileSystem.class); - private URI uri; - private String bucket; - private Path workingDir; - private AliyunOSSFileSystemStore store; - private int maxKeys; - - @Override - public FSDataOutputStream append(Path path, int bufferSize, - Progressable progress) throws IOException { - throw new IOException("Append is not supported!"); - } - - @Override - public void close() throws IOException { - try { - store.close(); - } finally { - super.close(); - } - } - - @Override - public FSDataOutputStream create(Path path, FsPermission permission, - boolean overwrite, int bufferSize, short replication, long blockSize, - Progressable progress) throws IOException { - String key = pathToKey(path); - FileStatus status = null; - - try { - // get the status or throw a FNFE - status = getFileStatus(path); - - // if the thread reaches here, there is something at the path - if (status.isDirectory()) { - // path references a directory - throw new FileAlreadyExistsException(path + " is a directory"); - } - if (!overwrite) { - // path references a file and overwrite is disabled - throw new FileAlreadyExistsException(path + " already exists"); - } - LOG.debug("Overwriting file {}", path); - } catch (FileNotFoundException e) { - // this means the file is not found - } - - return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(), - store, key, progress, statistics), (Statistics)(null)); - } - - /** - * {@inheritDoc} - * @throws FileNotFoundException if the parent directory is not present -or - * is not a directory. - */ - @Override - public FSDataOutputStream createNonRecursive(Path path, - FsPermission permission, - EnumSet flags, - int bufferSize, - short replication, - long blockSize, - Progressable progress) throws IOException { - Path parent = path.getParent(); - if (parent != null) { - // expect this to raise an exception if there is no parent - if (!getFileStatus(parent).isDirectory()) { - throw new FileAlreadyExistsException("Not a directory: " + parent); - } - } - return create(path, permission, - flags.contains(CreateFlag.OVERWRITE), bufferSize, - replication, blockSize, progress); - } - - @Override - public boolean delete(Path path, boolean recursive) throws IOException { - try { - return innerDelete(getFileStatus(path), recursive); - } catch (FileNotFoundException e) { - LOG.debug("Couldn't delete {} - does not exist", path); - return false; - } - } - - /** - * Delete an object. See {@link #delete(Path, boolean)}. - * - * @param status fileStatus object - * @param recursive if path is a directory and set to - * true, the directory is deleted else throws an exception. In - * case of a file the recursive can be set to either true or false. - * @return true if delete is successful else false. - * @throws IOException due to inability to delete a directory or file. - */ - private boolean innerDelete(FileStatus status, boolean recursive) - throws IOException { - Path f = status.getPath(); - String p = f.toUri().getPath(); - FileStatus[] statuses; - // indicating root directory "/". - if (p.equals("/")) { - statuses = listStatus(status.getPath()); - boolean isEmptyDir = statuses.length <= 0; - return rejectRootDirectoryDelete(isEmptyDir, recursive); - } - - String key = pathToKey(f); - if (status.isDirectory()) { - if (!recursive) { - // Check whether it is an empty directory or not - statuses = listStatus(status.getPath()); - if (statuses.length > 0) { - throw new IOException("Cannot remove directory " + f + - ": It is not empty!"); - } else { - // Delete empty directory without '-r' - key = AliyunOSSUtils.maybeAddTrailingSlash(key); - store.deleteObject(key); - } - } else { - store.deleteDirs(key); - } - } else { - store.deleteObject(key); - } - - createFakeDirectoryIfNecessary(f); - return true; - } - - /** - * Implements the specific logic to reject root directory deletion. - * The caller must return the result of this call, rather than - * attempt to continue with the delete operation: deleting root - * directories is never allowed. This method simply implements - * the policy of when to return an exit code versus raise an exception. - * @param isEmptyDir empty directory or not - * @param recursive recursive flag from command - * @return a return code for the operation - * @throws PathIOException if the operation was explicitly rejected. - */ - private boolean rejectRootDirectoryDelete(boolean isEmptyDir, - boolean recursive) throws IOException { - LOG.info("oss delete the {} root directory of {}", bucket, recursive); - if (isEmptyDir) { - return true; - } - if (recursive) { - return false; - } else { - // reject - throw new PathIOException(bucket, "Cannot delete root path"); - } - } - - private void createFakeDirectoryIfNecessary(Path f) throws IOException { - String key = pathToKey(f); - if (StringUtils.isNotEmpty(key) && !exists(f)) { - LOG.debug("Creating new fake directory at {}", f); - mkdir(pathToKey(f.getParent())); - } - } - - @Override - public FileStatus getFileStatus(Path path) throws IOException { - Path qualifiedPath = path.makeQualified(uri, workingDir); - String key = pathToKey(qualifiedPath); - - // Root always exists - if (key.length() == 0) { - return new FileStatus(0, true, 1, 0, 0, qualifiedPath); - } - - ObjectMetadata meta = store.getObjectMetadata(key); - // If key not found and key does not end with "/" - if (meta == null && !key.endsWith("/")) { - // In case of 'dir + "/"' - key += "/"; - meta = store.getObjectMetadata(key); - } - if (meta == null) { - ObjectListing listing = store.listObjects(key, 1, null, false); - if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) || - CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) { - return new FileStatus(0, true, 1, 0, 0, qualifiedPath); - } else { - throw new FileNotFoundException(path + ": No such file or directory!"); - } - } else if (objectRepresentsDirectory(key, meta.getContentLength())) { - return new FileStatus(0, true, 1, 0, meta.getLastModified().getTime(), - qualifiedPath); - } else { - return new FileStatus(meta.getContentLength(), false, 1, - getDefaultBlockSize(path), meta.getLastModified().getTime(), - qualifiedPath); - } - } - - @Override - public String getScheme() { - return "oss"; - } - - @Override - public URI getUri() { - return uri; - } - - @Override - public Path getWorkingDirectory() { - return workingDir; - } - - @Deprecated - public long getDefaultBlockSize() { - return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT); - } - - @Override - public String getCanonicalServiceName() { - // Does not support Token - return null; - } - - /** - * Initialize new FileSystem. - * - * @param name the uri of the file system, including host, port, etc. - * @param conf configuration of the file system - * @throws IOException IO problems - */ - public void initialize(URI name, Configuration conf) throws IOException { - super.initialize(name, conf); - - bucket = name.getHost(); - uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority()); - workingDir = new Path("/user", - System.getProperty("user.name")).makeQualified(uri, null); - - store = new AliyunOSSFileSystemStore(); - store.initialize(name, conf, statistics); - maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); - setConf(conf); - } - - /** - * Check if OSS object represents a directory. - * - * @param name object key - * @param size object content length - * @return true if object represents a directory - */ - private boolean objectRepresentsDirectory(final String name, - final long size) { - return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L; - } - - /** - * Turn a path (relative or otherwise) into an OSS key. - * - * @param path the path of the file. - * @return the key of the object that represents the file. - */ - private String pathToKey(Path path) { - if (!path.isAbsolute()) { - path = new Path(workingDir, path); - } - - return path.toUri().getPath().substring(1); - } - - private Path keyToPath(String key) { - return new Path("/" + key); - } - - @Override - public FileStatus[] listStatus(Path path) throws IOException { - String key = pathToKey(path); - if (LOG.isDebugEnabled()) { - LOG.debug("List status for path: " + path); - } - - final List result = new ArrayList(); - final FileStatus fileStatus = getFileStatus(path); - - if (fileStatus.isDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("listStatus: doing listObjects for directory " + key); - } - - ObjectListing objects = store.listObjects(key, maxKeys, null, false); - while (true) { - statistics.incrementReadOps(1); - for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { - String objKey = objectSummary.getKey(); - if (objKey.equals(key + "/")) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring: " + objKey); - } - continue; - } else { - Path keyPath = keyToPath(objectSummary.getKey()) - .makeQualified(uri, workingDir); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: fi: " + keyPath); - } - result.add(new FileStatus(objectSummary.getSize(), false, 1, - getDefaultBlockSize(keyPath), - objectSummary.getLastModified().getTime(), keyPath)); - } - } - - for (String prefix : objects.getCommonPrefixes()) { - if (prefix.equals(key + "/")) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring: " + prefix); - } - continue; - } else { - Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: rd: " + keyPath); - } - result.add(getFileStatus(keyPath)); - } - } - - if (objects.isTruncated()) { - if (LOG.isDebugEnabled()) { - LOG.debug("listStatus: list truncated - getting next batch"); - } - String nextMarker = objects.getNextMarker(); - objects = store.listObjects(key, maxKeys, nextMarker, false); - statistics.incrementReadOps(1); - } else { - break; - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding: rd (not a dir): " + path); - } - result.add(fileStatus); - } - - return result.toArray(new FileStatus[result.size()]); - } - - /** - * Used to create an empty file that represents an empty directory. - * - * @param key directory path - * @return true if directory is successfully created - * @throws IOException - */ - private boolean mkdir(final String key) throws IOException { - String dirName = key; - if (StringUtils.isNotEmpty(key)) { - if (!key.endsWith("/")) { - dirName += "/"; - } - store.storeEmptyFile(dirName); - } - return true; - } - - @Override - public boolean mkdirs(Path path, FsPermission permission) - throws IOException { - try { - FileStatus fileStatus = getFileStatus(path); - - if (fileStatus.isDirectory()) { - return true; - } else { - throw new FileAlreadyExistsException("Path is a file: " + path); - } - } catch (FileNotFoundException e) { - validatePath(path); - String key = pathToKey(path); - return mkdir(key); - } - } - - /** - * Check whether the path is a valid path. - * - * @param path the path to be checked. - * @throws IOException - */ - private void validatePath(Path path) throws IOException { - Path fPart = path.getParent(); - do { - try { - FileStatus fileStatus = getFileStatus(fPart); - if (fileStatus.isDirectory()) { - // If path exists and a directory, exit - break; - } else { - throw new FileAlreadyExistsException(String.format( - "Can't make directory for path '%s', it is a file.", fPart)); - } - } catch (FileNotFoundException fnfe) { - } - fPart = fPart.getParent(); - } while (fPart != null); - } - - @Override - public FSDataInputStream open(Path path, int bufferSize) throws IOException { - final FileStatus fileStatus = getFileStatus(path); - if (fileStatus.isDirectory()) { - throw new FileNotFoundException("Can't open " + path + - " because it is a directory"); - } - - return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store, - pathToKey(path), fileStatus.getLen(), statistics)); - } - - @Override - public boolean rename(Path srcPath, Path dstPath) throws IOException { - if (srcPath.isRoot()) { - // Cannot rename root of file system - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot rename the root of a filesystem"); - } - return false; - } - Path parent = dstPath.getParent(); - while (parent != null && !srcPath.equals(parent)) { - parent = parent.getParent(); - } - if (parent != null) { - return false; - } - FileStatus srcStatus = getFileStatus(srcPath); - FileStatus dstStatus; - try { - dstStatus = getFileStatus(dstPath); - } catch (FileNotFoundException fnde) { - dstStatus = null; - } - if (dstStatus == null) { - // If dst doesn't exist, check whether dst dir exists or not - dstStatus = getFileStatus(dstPath.getParent()); - if (!dstStatus.isDirectory()) { - throw new IOException(String.format( - "Failed to rename %s to %s, %s is a file", srcPath, dstPath, - dstPath.getParent())); - } - } else { - if (srcStatus.getPath().equals(dstStatus.getPath())) { - return !srcStatus.isDirectory(); - } else if (dstStatus.isDirectory()) { - // If dst is a directory - dstPath = new Path(dstPath, srcPath.getName()); - FileStatus[] statuses; - try { - statuses = listStatus(dstPath); - } catch (FileNotFoundException fnde) { - statuses = null; - } - if (statuses != null && statuses.length > 0) { - // If dst exists and not a directory / not empty - throw new FileAlreadyExistsException(String.format( - "Failed to rename %s to %s, file already exists or not empty!", - srcPath, dstPath)); - } - } else { - // If dst is not a directory - throw new FileAlreadyExistsException(String.format( - "Failed to rename %s to %s, file already exists!", srcPath, - dstPath)); - } - } - if (srcStatus.isDirectory()) { - copyDirectory(srcPath, dstPath); - } else { - copyFile(srcPath, dstPath); - } - - return srcPath.equals(dstPath) || delete(srcPath, true); - } - - /** - * Copy file from source path to destination path. - * (the caller should make sure srcPath is a file and dstPath is valid) - * - * @param srcPath source path. - * @param dstPath destination path. - * @return true if file is successfully copied. - */ - private boolean copyFile(Path srcPath, Path dstPath) { - String srcKey = pathToKey(srcPath); - String dstKey = pathToKey(dstPath); - return store.copyFile(srcKey, dstKey); - } - - /** - * Copy a directory from source path to destination path. - * (the caller should make sure srcPath is a directory, and dstPath is valid) - * - * @param srcPath source path. - * @param dstPath destination path. - * @return true if directory is successfully copied. - */ - private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { - String srcKey = AliyunOSSUtils - .maybeAddTrailingSlash(pathToKey(srcPath)); - String dstKey = AliyunOSSUtils - .maybeAddTrailingSlash(pathToKey(dstPath)); - - if (dstKey.startsWith(srcKey)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Cannot rename a directory to a subdirectory of self"); - } - return false; - } - - store.storeEmptyFile(dstKey); - ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true); - statistics.incrementReadOps(1); - // Copy files from src folder to dst - while (true) { - for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { - String newKey = - dstKey.concat(objectSummary.getKey().substring(srcKey.length())); - store.copyFile(objectSummary.getKey(), newKey); - } - if (objects.isTruncated()) { - String nextMarker = objects.getNextMarker(); - objects = store.listObjects(srcKey, maxKeys, nextMarker, true); - statistics.incrementReadOps(1); - } else { - break; - } - } - return true; - } - - @Override - public void setWorkingDirectory(Path dir) { - this.workingDir = dir; - } - - public AliyunOSSFileSystemStore getStore() { - return store; - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java deleted file mode 100644 index aba3db8cf8a..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java +++ /dev/null @@ -1,549 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.aliyun.oss; - -import com.aliyun.oss.ClientConfiguration; -import com.aliyun.oss.ClientException; -import com.aliyun.oss.OSSClient; -import com.aliyun.oss.OSSException; -import com.aliyun.oss.common.auth.CredentialsProvider; -import com.aliyun.oss.common.comm.Protocol; -import com.aliyun.oss.model.AbortMultipartUploadRequest; -import com.aliyun.oss.model.CannedAccessControlList; -import com.aliyun.oss.model.CompleteMultipartUploadRequest; -import com.aliyun.oss.model.CompleteMultipartUploadResult; -import com.aliyun.oss.model.CopyObjectResult; -import com.aliyun.oss.model.DeleteObjectsRequest; -import com.aliyun.oss.model.DeleteObjectsResult; -import com.aliyun.oss.model.GetObjectRequest; -import com.aliyun.oss.model.InitiateMultipartUploadRequest; -import com.aliyun.oss.model.InitiateMultipartUploadResult; -import com.aliyun.oss.model.ListObjectsRequest; -import com.aliyun.oss.model.ObjectMetadata; -import com.aliyun.oss.model.ObjectListing; -import com.aliyun.oss.model.OSSObjectSummary; -import com.aliyun.oss.model.PartETag; -import com.aliyun.oss.model.PutObjectResult; -import com.aliyun.oss.model.UploadPartCopyRequest; -import com.aliyun.oss.model.UploadPartCopyResult; -import com.aliyun.oss.model.UploadPartRequest; -import com.aliyun.oss.model.UploadPartResult; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * Core implementation of Aliyun OSS Filesystem for Hadoop. - * Provides the bridging logic between Hadoop's abstract filesystem and - * Aliyun OSS. - */ -public class AliyunOSSFileSystemStore { - public static final Logger LOG = - LoggerFactory.getLogger(AliyunOSSFileSystemStore.class); - private FileSystem.Statistics statistics; - private OSSClient ossClient; - private String bucketName; - private long uploadPartSize; - private long multipartThreshold; - private long partSize; - private int maxKeys; - private String serverSideEncryptionAlgorithm; - - public void initialize(URI uri, Configuration conf, - FileSystem.Statistics stat) throws IOException { - statistics = stat; - ClientConfiguration clientConf = new ClientConfiguration(); - clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY, - MAXIMUM_CONNECTIONS_DEFAULT)); - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY, - SECURE_CONNECTIONS_DEFAULT); - clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY, - MAX_ERROR_RETRIES_DEFAULT)); - clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY, - ESTABLISH_TIMEOUT_DEFAULT)); - clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY, - SOCKET_TIMEOUT_DEFAULT)); - - String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, ""); - int proxyPort = conf.getInt(PROXY_PORT_KEY, -1); - if (StringUtils.isNotEmpty(proxyHost)) { - clientConf.setProxyHost(proxyHost); - if (proxyPort >= 0) { - clientConf.setProxyPort(proxyPort); - } else { - if (secureConnections) { - LOG.warn("Proxy host set without port. Using HTTPS default 443"); - clientConf.setProxyPort(443); - } else { - LOG.warn("Proxy host set without port. Using HTTP default 80"); - clientConf.setProxyPort(80); - } - } - String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY); - String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY); - if ((proxyUsername == null) != (proxyPassword == null)) { - String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " + - PROXY_PASSWORD_KEY + " set without the other."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - clientConf.setProxyUsername(proxyUsername); - clientConf.setProxyPassword(proxyPassword); - clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY)); - clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY)); - } else if (proxyPort >= 0) { - String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " + - PROXY_HOST_KEY; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - - String endPoint = conf.getTrimmed(ENDPOINT_KEY, ""); - if (StringUtils.isEmpty(endPoint)) { - throw new IllegalArgumentException("Aliyun OSS endpoint should not be " + - "null or empty. Please set proper endpoint with 'fs.oss.endpoint'."); - } - CredentialsProvider provider = - AliyunOSSUtils.getCredentialsProvider(conf); - ossClient = new OSSClient(endPoint, provider, clientConf); - uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, - MULTIPART_UPLOAD_SIZE_DEFAULT); - multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, - MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); - partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, - MULTIPART_UPLOAD_SIZE_DEFAULT); - if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) { - partSize = MIN_MULTIPART_UPLOAD_PART_SIZE; - } - serverSideEncryptionAlgorithm = - conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, ""); - - if (uploadPartSize < 5 * 1024 * 1024) { - LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB"); - uploadPartSize = 5 * 1024 * 1024; - } - - if (multipartThreshold < 5 * 1024 * 1024) { - LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB"); - multipartThreshold = 5 * 1024 * 1024; - } - - if (multipartThreshold > 1024 * 1024 * 1024) { - LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB"); - multipartThreshold = 1024 * 1024 * 1024; - } - - String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT); - if (StringUtils.isNotEmpty(cannedACLName)) { - CannedAccessControlList cannedACL = - CannedAccessControlList.valueOf(cannedACLName); - ossClient.setBucketAcl(bucketName, cannedACL); - } - - maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); - bucketName = uri.getHost(); - } - - /** - * Delete an object, and update write operation statistics. - * - * @param key key to blob to delete. - */ - public void deleteObject(String key) { - ossClient.deleteObject(bucketName, key); - statistics.incrementWriteOps(1); - } - - /** - * Delete a list of keys, and update write operation statistics. - * - * @param keysToDelete collection of keys to delete. - * @throws IOException if failed to delete objects. - */ - public void deleteObjects(List keysToDelete) throws IOException { - if (CollectionUtils.isEmpty(keysToDelete)) { - LOG.warn("Keys to delete is empty."); - return; - } - - int retry = 10; - int tries = 0; - List deleteFailed = keysToDelete; - while(CollectionUtils.isNotEmpty(deleteFailed)) { - DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName); - deleteRequest.setKeys(deleteFailed); - // There are two modes to do batch delete: - // 1. detail mode: DeleteObjectsResult.getDeletedObjects returns objects - // which were deleted successfully. - // 2. simple mode: DeleteObjectsResult.getDeletedObjects returns objects - // which were deleted unsuccessfully. - // Here, we choose the simple mode to do batch delete. - deleteRequest.setQuiet(true); - DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest); - deleteFailed = result.getDeletedObjects(); - tries++; - if (tries == retry) { - break; - } - } - - if (tries == retry && CollectionUtils.isNotEmpty(deleteFailed)) { - // Most of time, it is impossible to try 10 times, expect the - // Aliyun OSS service problems. - throw new IOException("Failed to delete Aliyun OSS objects for " + - tries + " times."); - } - } - - /** - * Delete a directory from Aliyun OSS. - * - * @param key directory key to delete. - * @throws IOException if failed to delete directory. - */ - public void deleteDirs(String key) throws IOException { - key = AliyunOSSUtils.maybeAddTrailingSlash(key); - ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); - listRequest.setPrefix(key); - listRequest.setDelimiter(null); - listRequest.setMaxKeys(maxKeys); - - while (true) { - ObjectListing objects = ossClient.listObjects(listRequest); - statistics.incrementReadOps(1); - List keysToDelete = new ArrayList(); - for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { - keysToDelete.add(objectSummary.getKey()); - } - deleteObjects(keysToDelete); - if (objects.isTruncated()) { - listRequest.setMarker(objects.getNextMarker()); - } else { - break; - } - } - } - - /** - * Return metadata of a given object key. - * - * @param key object key. - * @return return null if key does not exist. - */ - public ObjectMetadata getObjectMetadata(String key) { - try { - return ossClient.getObjectMetadata(bucketName, key); - } catch (OSSException osse) { - return null; - } finally { - statistics.incrementReadOps(1); - } - } - - /** - * Upload an empty file as an OSS object, using single upload. - * - * @param key object key. - * @throws IOException if failed to upload object. - */ - public void storeEmptyFile(String key) throws IOException { - ObjectMetadata dirMeta = new ObjectMetadata(); - byte[] buffer = new byte[0]; - ByteArrayInputStream in = new ByteArrayInputStream(buffer); - dirMeta.setContentLength(0); - try { - ossClient.putObject(bucketName, key, in, dirMeta); - } finally { - in.close(); - } - } - - /** - * Copy an object from source key to destination key. - * - * @param srcKey source key. - * @param dstKey destination key. - * @return true if file is successfully copied. - */ - public boolean copyFile(String srcKey, String dstKey) { - ObjectMetadata objectMeta = - ossClient.getObjectMetadata(bucketName, srcKey); - long contentLength = objectMeta.getContentLength(); - if (contentLength <= multipartThreshold) { - return singleCopy(srcKey, dstKey); - } else { - return multipartCopy(srcKey, contentLength, dstKey); - } - } - - /** - * Use single copy to copy an OSS object. - * (The caller should make sure srcPath is a file and dstPath is valid) - * - * @param srcKey source key. - * @param dstKey destination key. - * @return true if object is successfully copied. - */ - private boolean singleCopy(String srcKey, String dstKey) { - CopyObjectResult copyResult = - ossClient.copyObject(bucketName, srcKey, bucketName, dstKey); - LOG.debug(copyResult.getETag()); - return true; - } - - /** - * Use multipart copy to copy an OSS object. - * (The caller should make sure srcPath is a file and dstPath is valid) - * - * @param srcKey source key. - * @param contentLength data size of the object to copy. - * @param dstKey destination key. - * @return true if success, or false if upload is aborted. - */ - private boolean multipartCopy(String srcKey, long contentLength, - String dstKey) { - long realPartSize = - AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize); - int partNum = (int) (contentLength / realPartSize); - if (contentLength % realPartSize != 0) { - partNum++; - } - InitiateMultipartUploadRequest initiateMultipartUploadRequest = - new InitiateMultipartUploadRequest(bucketName, dstKey); - ObjectMetadata meta = new ObjectMetadata(); - if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { - meta.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - initiateMultipartUploadRequest.setObjectMetadata(meta); - InitiateMultipartUploadResult initiateMultipartUploadResult = - ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); - String uploadId = initiateMultipartUploadResult.getUploadId(); - List partETags = new ArrayList(); - try { - for (int i = 0; i < partNum; i++) { - long skipBytes = realPartSize * i; - long size = (realPartSize < contentLength - skipBytes) ? - realPartSize : contentLength - skipBytes; - UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest(); - partCopyRequest.setSourceBucketName(bucketName); - partCopyRequest.setSourceKey(srcKey); - partCopyRequest.setBucketName(bucketName); - partCopyRequest.setKey(dstKey); - partCopyRequest.setUploadId(uploadId); - partCopyRequest.setPartSize(size); - partCopyRequest.setBeginIndex(skipBytes); - partCopyRequest.setPartNumber(i + 1); - UploadPartCopyResult partCopyResult = - ossClient.uploadPartCopy(partCopyRequest); - statistics.incrementWriteOps(1); - partETags.add(partCopyResult.getPartETag()); - } - CompleteMultipartUploadRequest completeMultipartUploadRequest = - new CompleteMultipartUploadRequest(bucketName, dstKey, - uploadId, partETags); - CompleteMultipartUploadResult completeMultipartUploadResult = - ossClient.completeMultipartUpload(completeMultipartUploadRequest); - LOG.debug(completeMultipartUploadResult.getETag()); - return true; - } catch (OSSException | ClientException e) { - AbortMultipartUploadRequest abortMultipartUploadRequest = - new AbortMultipartUploadRequest(bucketName, dstKey, uploadId); - ossClient.abortMultipartUpload(abortMultipartUploadRequest); - return false; - } - } - - /** - * Upload a file as an OSS object, using single upload. - * - * @param key object key. - * @param file local file to upload. - * @throws IOException if failed to upload object. - */ - public void uploadObject(String key, File file) throws IOException { - File object = file.getAbsoluteFile(); - FileInputStream fis = new FileInputStream(object); - ObjectMetadata meta = new ObjectMetadata(); - meta.setContentLength(object.length()); - if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { - meta.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - try { - PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta); - LOG.debug(result.getETag()); - statistics.incrementWriteOps(1); - } finally { - fis.close(); - } - } - - /** - * Upload a file as an OSS object, using multipart upload. - * - * @param key object key. - * @param file local file to upload. - * @throws IOException if failed to upload object. - */ - public void multipartUploadObject(String key, File file) throws IOException { - File object = file.getAbsoluteFile(); - long dataLen = object.length(); - long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize); - int partNum = (int) (dataLen / realPartSize); - if (dataLen % realPartSize != 0) { - partNum += 1; - } - - InitiateMultipartUploadRequest initiateMultipartUploadRequest = - new InitiateMultipartUploadRequest(bucketName, key); - ObjectMetadata meta = new ObjectMetadata(); - if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { - meta.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - initiateMultipartUploadRequest.setObjectMetadata(meta); - InitiateMultipartUploadResult initiateMultipartUploadResult = - ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); - List partETags = new ArrayList(); - String uploadId = initiateMultipartUploadResult.getUploadId(); - - try { - for (int i = 0; i < partNum; i++) { - //TODO Optimize this, avoid opening the object multiple times. - FileInputStream fis = new FileInputStream(object); - try { - long skipBytes = realPartSize * i; - AliyunOSSUtils.skipFully(fis, skipBytes); - long size = (realPartSize < dataLen - skipBytes) ? - realPartSize : dataLen - skipBytes; - UploadPartRequest uploadPartRequest = new UploadPartRequest(); - uploadPartRequest.setBucketName(bucketName); - uploadPartRequest.setKey(key); - uploadPartRequest.setUploadId(uploadId); - uploadPartRequest.setInputStream(fis); - uploadPartRequest.setPartSize(size); - uploadPartRequest.setPartNumber(i + 1); - UploadPartResult uploadPartResult = - ossClient.uploadPart(uploadPartRequest); - statistics.incrementWriteOps(1); - partETags.add(uploadPartResult.getPartETag()); - } finally { - fis.close(); - } - } - CompleteMultipartUploadRequest completeMultipartUploadRequest = - new CompleteMultipartUploadRequest(bucketName, key, - uploadId, partETags); - CompleteMultipartUploadResult completeMultipartUploadResult = - ossClient.completeMultipartUpload(completeMultipartUploadRequest); - LOG.debug(completeMultipartUploadResult.getETag()); - } catch (OSSException | ClientException e) { - AbortMultipartUploadRequest abortMultipartUploadRequest = - new AbortMultipartUploadRequest(bucketName, key, uploadId); - ossClient.abortMultipartUpload(abortMultipartUploadRequest); - } - } - - /** - * list objects. - * - * @param prefix prefix. - * @param maxListingLength max no. of entries - * @param marker last key in any previous search. - * @param recursive whether to list directory recursively. - * @return a list of matches. - */ - public ObjectListing listObjects(String prefix, int maxListingLength, - String marker, boolean recursive) { - String delimiter = recursive ? null : "/"; - prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix); - ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); - listRequest.setPrefix(prefix); - listRequest.setDelimiter(delimiter); - listRequest.setMaxKeys(maxListingLength); - listRequest.setMarker(marker); - - ObjectListing listing = ossClient.listObjects(listRequest); - statistics.incrementReadOps(1); - return listing; - } - - /** - * Retrieve a part of an object. - * - * @param key the object name that is being retrieved from the Aliyun OSS. - * @param byteStart start position. - * @param byteEnd end position. - * @return This method returns null if the key is not found. - */ - public InputStream retrieve(String key, long byteStart, long byteEnd) { - try { - GetObjectRequest request = new GetObjectRequest(bucketName, key); - request.setRange(byteStart, byteEnd); - return ossClient.getObject(request).getObjectContent(); - } catch (OSSException | ClientException e) { - return null; - } - } - - /** - * Close OSS client properly. - */ - public void close() { - if (ossClient != null) { - ossClient.shutdown(); - ossClient = null; - } - } - - /** - * Clean up all objects matching the prefix. - * - * @param prefix Aliyun OSS object prefix. - * @throws IOException if failed to clean up objects. - */ - public void purge(String prefix) throws IOException { - String key; - try { - ObjectListing objects = listObjects(prefix, maxKeys, null, true); - for (OSSObjectSummary object : objects.getObjectSummaries()) { - key = object.getKey(); - ossClient.deleteObject(bucketName, key); - } - - for (String dir: objects.getCommonPrefixes()) { - deleteDirs(dir); - } - } catch (OSSException | ClientException e) { - LOG.error("Failed to purge " + prefix); - } - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java deleted file mode 100644 index 3b2bc022e22..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileSystem.Statistics; - -/** - * The input stream for OSS blob system. - * The class uses multi-part downloading to read data from the object content - * stream. - */ -public class AliyunOSSInputStream extends FSInputStream { - public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class); - private final long downloadPartSize; - private AliyunOSSFileSystemStore store; - private final String key; - private Statistics statistics; - private boolean closed; - private InputStream wrappedStream = null; - private long contentLength; - private long position; - private long partRemaining; - - public AliyunOSSInputStream(Configuration conf, - AliyunOSSFileSystemStore store, String key, Long contentLength, - Statistics statistics) throws IOException { - this.store = store; - this.key = key; - this.statistics = statistics; - this.contentLength = contentLength; - downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, - MULTIPART_DOWNLOAD_SIZE_DEFAULT); - reopen(0); - closed = false; - } - - /** - * Reopen the wrapped stream at give position, by seeking for - * data of a part length from object content stream. - * - * @param pos position from start of a file - * @throws IOException if failed to reopen - */ - private synchronized void reopen(long pos) throws IOException { - long partSize; - - if (pos < 0) { - throw new EOFException("Cannot seek at negative position:" + pos); - } else if (pos > contentLength) { - throw new EOFException("Cannot seek after EOF, contentLength:" + - contentLength + " position:" + pos); - } else if (pos + downloadPartSize > contentLength) { - partSize = contentLength - pos; - } else { - partSize = downloadPartSize; - } - - if (wrappedStream != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Aborting old stream to open at pos " + pos); - } - wrappedStream.close(); - } - - wrappedStream = store.retrieve(key, pos, pos + partSize -1); - if (wrappedStream == null) { - throw new IOException("Null IO stream"); - } - position = pos; - partRemaining = partSize; - } - - @Override - public synchronized int read() throws IOException { - checkNotClosed(); - - if (partRemaining <= 0 && position < contentLength) { - reopen(position); - } - - int tries = MAX_RETRIES; - boolean retry; - int byteRead = -1; - do { - retry = false; - try { - byteRead = wrappedStream.read(); - } catch (Exception e) { - handleReadException(e, --tries); - retry = true; - } - } while (retry); - if (byteRead >= 0) { - position++; - partRemaining--; - } - - if (statistics != null && byteRead >= 0) { - statistics.incrementBytesRead(byteRead); - } - return byteRead; - } - - - /** - * Verify that the input stream is open. Non blocking; this gives - * the last state of the volatile {@link #closed} field. - * - * @throws IOException if the connection is closed. - */ - private void checkNotClosed() throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - } - - @Override - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - checkNotClosed(); - - if (buf == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > buf.length - off) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return 0; - } - - int bytesRead = 0; - // Not EOF, and read not done - while (position < contentLength && bytesRead < len) { - if (partRemaining == 0) { - reopen(position); - } - - int tries = MAX_RETRIES; - boolean retry; - int bytes = -1; - do { - retry = false; - try { - bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead); - } catch (Exception e) { - handleReadException(e, --tries); - retry = true; - } - } while (retry); - - if (bytes > 0) { - bytesRead += bytes; - position += bytes; - partRemaining -= bytes; - } else if (partRemaining != 0) { - throw new IOException("Failed to read from stream. Remaining:" + - partRemaining); - } - } - - if (statistics != null && bytesRead > 0) { - statistics.incrementBytesRead(bytesRead); - } - - // Read nothing, but attempt to read something - if (bytesRead == 0 && len > 0) { - return -1; - } else { - return bytesRead; - } - } - - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - closed = true; - if (wrappedStream != null) { - wrappedStream.close(); - } - } - - @Override - public synchronized int available() throws IOException { - checkNotClosed(); - - long remaining = contentLength - position; - if (remaining > Integer.MAX_VALUE) { - return Integer.MAX_VALUE; - } - return (int)remaining; - } - - @Override - public synchronized void seek(long pos) throws IOException { - checkNotClosed(); - if (position == pos) { - return; - } else if (pos > position && pos < position + partRemaining) { - long len = pos - position; - AliyunOSSUtils.skipFully(wrappedStream, len); - position = pos; - partRemaining -= len; - } else { - reopen(pos); - } - } - - @Override - public synchronized long getPos() throws IOException { - checkNotClosed(); - return position; - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - checkNotClosed(); - return false; - } - - private void handleReadException(Exception e, int tries) throws IOException{ - if (tries == 0) { - throw new IOException(e); - } - - LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" + - " connection at position '" + position + "', " + e.getMessage()); - try { - Thread.sleep(100); - } catch (InterruptedException e2) { - LOG.warn(e2.getMessage()); - } - reopen(position); - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java deleted file mode 100644 index c952d0ae858..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem.Statistics; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.util.Progressable; - -/** - * The output stream for OSS blob system. - * Data will be buffered on local disk, then uploaded to OSS in - * {@link #close()} method. - */ -public class AliyunOSSOutputStream extends OutputStream { - public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class); - private AliyunOSSFileSystemStore store; - private final String key; - private Statistics statistics; - private Progressable progress; - private long partSizeThreshold; - private LocalDirAllocator dirAlloc; - private boolean closed; - private File tmpFile; - private BufferedOutputStream backupStream; - - public AliyunOSSOutputStream(Configuration conf, - AliyunOSSFileSystemStore store, String key, Progressable progress, - Statistics statistics) throws IOException { - this.store = store; - this.key = key; - // The caller cann't get any progress information - this.progress = progress; - this.statistics = statistics; - partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, - MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); - - if (conf.get(BUFFER_DIR_KEY) == null) { - conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss"); - } - dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); - - tmpFile = dirAlloc.createTmpFileForWrite("output-", - LocalDirAllocator.SIZE_UNKNOWN, conf); - backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile)); - closed = false; - } - - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - closed = true; - if (backupStream != null) { - backupStream.close(); - } - long dataLen = tmpFile.length(); - try { - if (dataLen <= partSizeThreshold) { - store.uploadObject(key, tmpFile); - } else { - store.multipartUploadObject(key, tmpFile); - } - } finally { - if (!tmpFile.delete()) { - LOG.warn("Can not delete file: " + tmpFile); - } - } - } - - - - @Override - public synchronized void flush() throws IOException { - backupStream.flush(); - } - - @Override - public synchronized void write(int b) throws IOException { - backupStream.write(b); - statistics.incrementBytesWritten(1); - } - -} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java deleted file mode 100644 index 263b4cf8880..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import java.io.IOException; -import java.io.InputStream; - -import com.aliyun.oss.common.auth.CredentialsProvider; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.ProviderUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * Utility methods for Aliyun OSS code. - */ -public final class AliyunOSSUtils { - private static final Logger LOG = - LoggerFactory.getLogger(AliyunOSSUtils.class); - - private AliyunOSSUtils() { - } - - /** - * Used to get password from configuration. - * - * @param conf configuration that contains password information - * @param key the key of the password - * @return the value for the key - * @throws IOException if failed to get password from configuration - */ - public static String getValueWithKey(Configuration conf, String key) - throws IOException { - try { - final char[] pass = conf.getPassword(key); - if (pass != null) { - return (new String(pass)).trim(); - } else { - return ""; - } - } catch (IOException ioe) { - throw new IOException("Cannot find password option " + key, ioe); - } - } - - /** - * Skip the requested number of bytes or fail if there are no enough bytes - * left. This allows for the possibility that {@link InputStream#skip(long)} - * may not skip as many bytes as requested (most likely because of reaching - * EOF). - * - * @param is the input stream to skip. - * @param n the number of bytes to skip. - * @throws IOException thrown when skipped less number of bytes. - */ - public static void skipFully(InputStream is, long n) throws IOException { - long total = 0; - long cur = 0; - - do { - cur = is.skip(n - total); - total += cur; - } while((total < n) && (cur > 0)); - - if (total < n) { - throw new IOException("Failed to skip " + n + " bytes, possibly due " + - "to EOF."); - } - } - - /** - * Calculate a proper size of multipart piece. If minPartSize - * is too small, the number of multipart pieces may exceed the limit of - * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}. - * - * @param contentLength the size of file. - * @param minPartSize the minimum size of multipart piece. - * @return a revisional size of multipart piece. - */ - public static long calculatePartSize(long contentLength, long minPartSize) { - long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1; - return Math.max(minPartSize, tmpPartSize); - } - - /** - * Create credential provider specified by configuration, or create default - * credential provider if not specified. - * - * @param conf configuration - * @return a credential provider - * @throws IOException on any problem. Class construction issues may be - * nested inside the IOE. - */ - public static CredentialsProvider getCredentialsProvider(Configuration conf) - throws IOException { - CredentialsProvider credentials; - - String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY); - if (StringUtils.isEmpty(className)) { - Configuration newConf = - ProviderUtils.excludeIncompatibleCredentialProviders(conf, - AliyunOSSFileSystem.class); - credentials = new AliyunCredentialsProvider(newConf); - } else { - try { - LOG.debug("Credential provider class is:" + className); - Class credClass = Class.forName(className); - try { - credentials = - (CredentialsProvider)credClass.getDeclaredConstructor( - Configuration.class).newInstance(conf); - } catch (NoSuchMethodException | SecurityException e) { - credentials = - (CredentialsProvider)credClass.getDeclaredConstructor() - .newInstance(); - } - } catch (ClassNotFoundException e) { - throw new IOException(className + " not found.", e); - } catch (NoSuchMethodException | SecurityException e) { - throw new IOException(String.format("%s constructor exception. A " + - "class specified in %s must provide an accessible constructor " + - "accepting URI and Configuration, or an accessible default " + - "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), - e); - } catch (ReflectiveOperationException | IllegalArgumentException e) { - throw new IOException(className + " instantiation exception.", e); - } - } - - return credentials; - } - - /** - * Turns a path (relative or otherwise) into an OSS key, adding a trailing - * "/" if the path is not the root and does not already have a "/" - * at the end. - * - * @param key OSS key or "" - * @return the with a trailing "/", or, if it is the root key, "". - */ - public static String maybeAddTrailingSlash(String key) { - if (StringUtils.isNotEmpty(key) && !key.endsWith("/")) { - return key + '/'; - } else { - return key; - } - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java deleted file mode 100644 index 04a2ccd6c54..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -/** - * ALL configuration constants for OSS filesystem. - */ -public final class Constants { - - private Constants() { - } - - // Class of credential provider - public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY = - "fs.oss.credentials.provider"; - - // OSS access verification - public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId"; - public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret"; - public static final String SECURITY_TOKEN = "fs.oss.securityToken"; - - // Number of simultaneous connections to oss - public static final String MAXIMUM_CONNECTIONS_KEY = - "fs.oss.connection.maximum"; - public static final int MAXIMUM_CONNECTIONS_DEFAULT = 32; - - // Connect to oss over ssl - public static final String SECURE_CONNECTIONS_KEY = - "fs.oss.connection.secure.enabled"; - public static final boolean SECURE_CONNECTIONS_DEFAULT = true; - - // Use a custom endpoint - public static final String ENDPOINT_KEY = "fs.oss.endpoint"; - - // Connect to oss through a proxy server - public static final String PROXY_HOST_KEY = "fs.oss.proxy.host"; - public static final String PROXY_PORT_KEY = "fs.oss.proxy.port"; - public static final String PROXY_USERNAME_KEY = "fs.oss.proxy.username"; - public static final String PROXY_PASSWORD_KEY = "fs.oss.proxy.password"; - public static final String PROXY_DOMAIN_KEY = "fs.oss.proxy.domain"; - public static final String PROXY_WORKSTATION_KEY = - "fs.oss.proxy.workstation"; - - // Number of times we should retry errors - public static final String MAX_ERROR_RETRIES_KEY = "fs.oss.attempts.maximum"; - public static final int MAX_ERROR_RETRIES_DEFAULT = 20; - - // Time until we give up trying to establish a connection to oss - public static final String ESTABLISH_TIMEOUT_KEY = - "fs.oss.connection.establish.timeout"; - public static final int ESTABLISH_TIMEOUT_DEFAULT = 50000; - - // Time until we give up on a connection to oss - public static final String SOCKET_TIMEOUT_KEY = "fs.oss.connection.timeout"; - public static final int SOCKET_TIMEOUT_DEFAULT = 200000; - - // Number of records to get while paging through a directory listing - public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum"; - public static final int MAX_PAGING_KEYS_DEFAULT = 1000; - - // Size of each of or multipart pieces in bytes - public static final String MULTIPART_UPLOAD_SIZE_KEY = - "fs.oss.multipart.upload.size"; - - public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024; - public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000; - - // Minimum size in bytes before we start a multipart uploads or copy - public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY = - "fs.oss.multipart.upload.threshold"; - public static final long MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT = - 20 * 1024 * 1024; - - public static final String MULTIPART_DOWNLOAD_SIZE_KEY = - "fs.oss.multipart.download.size"; - - public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024; - - // Comma separated list of directories - public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir"; - - // private | public-read | public-read-write - public static final String CANNED_ACL_KEY = "fs.oss.acl.default"; - public static final String CANNED_ACL_DEFAULT = ""; - - // OSS server-side encryption - public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY = - "fs.oss.server-side-encryption-algorithm"; - - public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size"; - public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024; - public static final String FS_OSS = "oss"; - - public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L; - public static final int MAX_RETRIES = 10; - -} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java deleted file mode 100644 index 234567b2b00..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Aliyun OSS Filesystem. - */ -package org.apache.hadoop.fs.aliyun.oss; \ No newline at end of file diff --git a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md deleted file mode 100644 index 62e650506ff..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md +++ /dev/null @@ -1,294 +0,0 @@ - - -# Hadoop-Aliyun module: Integration with Aliyun Web Services - - - -## Overview - -The `hadoop-aliyun` module provides support for Aliyun integration with -[Aliyun Object Storage Service (Aliyun OSS)](https://www.aliyun.com/product/oss). -The generated JAR file, `hadoop-aliyun.jar` also declares a transitive -dependency on all external artifacts which are needed for this support — enabling -downstream applications to easily use this support. - -To make it part of Apache Hadoop's default classpath, simply make sure -that HADOOP_OPTIONAL_TOOLS in hadoop-env.sh has 'hadoop-aliyun' in the list. - -### Features - -* Read and write data stored in Aliyun OSS. -* Present a hierarchical file system view by implementing the standard Hadoop -[`FileSystem`](../api/org/apache/hadoop/fs/FileSystem.html) interface. -* Can act as a source of data in a MapReduce job, or a sink. - -### Warning #1: Object Stores are not filesystems. - -Aliyun OSS is an example of "an object store". In order to achieve scalability -and especially high availability, Aliyun OSS has relaxed some of the constraints -which classic "POSIX" filesystems promise. - - - -Specifically - -1. Atomic operations: `delete()` and `rename()` are implemented by recursive -file-by-file operations. They take time at least proportional to the number of files, -during which time partial updates may be visible. `delete()` and `rename()` -can not guarantee atomicity. If the operations are interrupted, the filesystem -is left in an intermediate state. -2. File owner and group are persisted, but the permissions model is not enforced. -Authorization occurs at the level of the entire Aliyun account via -[Aliyun Resource Access Management (Aliyun RAM)](https://www.aliyun.com/product/ram). -3. Directory last access time is not tracked. -4. The append operation is not supported. - -### Warning #2: Directory last access time is not tracked, -features of Hadoop relying on this can have unexpected behaviour. E.g. the -AggregatedLogDeletionService of YARN will not remove the appropriate logfiles. - -### Warning #3: Your Aliyun credentials are valuable - -Your Aliyun credentials not only pay for services, they offer read and write -access to the data. Anyone with the account can not only read your datasets -—they can delete them. - -Do not inadvertently share these credentials through means such as -1. Checking in to SCM any configuration files containing the secrets. -2. Logging them to a console, as they invariably end up being seen. -3. Defining filesystem URIs with the credentials in the URL, such as -`oss://accessKeyId:accessKeySecret@directory/file`. They will end up in -logs and error messages. -4. Including the secrets in bug reports. - -If you do any of these: change your credentials immediately! - -### Warning #4: The Aliyun OSS client provided by Aliyun E-MapReduce are different from this implementation - -Specifically: on Aliyun E-MapReduce, `oss://` is also supported but with -a different implementation. If you are using Aliyun E-MapReduce, -follow these instructions —and be aware that all issues related to Aliyun -OSS integration in E-MapReduce can only be addressed by Aliyun themselves: -please raise your issues with them. - -## OSS - -### Authentication properties - - - fs.oss.accessKeyId - Aliyun access key ID - - - - fs.oss.accessKeySecret - Aliyun access key secret - - - - fs.oss.credentials.provider - - Class name of a credentials provider that implements - com.aliyun.oss.common.auth.CredentialsProvider. Omit if using access/secret keys - or another authentication mechanism. The specified class must provide an - accessible constructor accepting java.net.URI and - org.apache.hadoop.conf.Configuration, or an accessible default constructor. - - - -### Other properties - - - fs.oss.endpoint - Aliyun OSS endpoint to connect to. An up-to-date list is - provided in the Aliyun OSS Documentation. - - - - - fs.oss.proxy.host - Hostname of the (optinal) proxy server for Aliyun OSS connection - - - - fs.oss.proxy.port - Proxy server port - - - - fs.oss.proxy.username - Username for authenticating with proxy server - - - - fs.oss.proxy.password - Password for authenticating with proxy server. - - - - fs.oss.proxy.domain - Domain for authenticating with proxy server. - - - - fs.oss.proxy.workstation - Workstation for authenticating with proxy server. - - - - fs.oss.attempts.maximum - 20 - How many times we should retry commands on transient errors. - - - - fs.oss.connection.establish.timeout - 50000 - Connection setup timeout in milliseconds. - - - - fs.oss.connection.timeout - 200000 - Socket connection timeout in milliseconds. - - - - fs.oss.paging.maximum - 1000 - How many keys to request from Aliyun OSS when doing directory listings at a time. - - - - - fs.oss.multipart.upload.size - 10485760 - Size of each of multipart pieces in bytes. - - - - fs.oss.multipart.upload.threshold - 20971520 - Minimum size in bytes before we start a multipart uploads or copy. - - - - fs.oss.multipart.download.size - 102400/value> - Size in bytes in each request from ALiyun OSS. - - - - fs.oss.buffer.dir - Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS - - - - fs.oss.acl.default - - Set a canned ACL for bucket. Value may be private, public-read, public-read-write. - - - - - fs.oss.server-side-encryption-algorithm - - Specify a server-side encryption algorithm for oss: file system. - Unset by default, and the only other currently allowable value is AES256. - - - - - fs.oss.connection.maximum - 32 - Number of simultaneous connections to oss. - - - - fs.oss.connection.secure.enabled - true - Connect to oss over ssl or not, true by default. - - -## Testing the hadoop-aliyun Module - -To test `oss://` filesystem client, two files which pass in authentication -details to the test runner are needed. - -1. `auth-keys.xml` -2. `core-site.xml` - -Those two configuration files must be put into -`hadoop-tools/hadoop-aliyun/src/test/resources`. - -### `core-site.xml` - -This file pre-exists and sources the configurations created in `auth-keys.xml`. - -For most cases, no modification is needed, unless a specific, non-default property -needs to be set during the testing. - -### `auth-keys.xml` - -This file triggers the testing of Aliyun OSS module. Without this file, -*none of the tests in this module will be executed* - -It contains the access key Id/secret and proxy information that are needed to -connect to Aliyun OSS, and an OSS bucket URL should be also provided. - -1. `test.fs.oss.name` : the URL of the bucket for Aliyun OSS tests - -The contents of the bucket will be cleaned during the testing process, so -do not use the bucket for any purpose other than testing. - -### Run Hadoop contract tests -Create file `contract-test-options.xml` under `/test/resources`. If a -specific file `fs.contract.test.fs.oss` test path is not defined, those -tests will be skipped. Credentials are also needed to run any of those -tests, they can be copied from `auth-keys.xml` or through direct -XInclude inclusion. Here is an example of `contract-test-options.xml`: - - - - - - - - - fs.contract.test.fs.oss - oss://spark-tests - - - - fs.oss.impl - org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem - - - - fs.oss.endpoint - oss-cn-hangzhou.aliyuncs.com - - - - fs.oss.buffer.dir - /tmp/oss - - - - fs.oss.multipart.download.size - 102400 - - diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSTestUtils.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSTestUtils.java deleted file mode 100644 index 901cb2bd082..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSTestUtils.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.junit.internal.AssumptionViolatedException; - -import java.io.IOException; -import java.net.URI; - -/** - * Utility class for Aliyun OSS Tests. - */ -public final class AliyunOSSTestUtils { - - private AliyunOSSTestUtils() { - } - - /** - * Create the test filesystem. - * - * If the test.fs.oss.name property is not set, - * tests will fail. - * - * @param conf configuration - * @return the FS - * @throws IOException - */ - public static AliyunOSSFileSystem createTestFileSystem(Configuration conf) - throws IOException { - String fsname = conf.getTrimmed( - TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME, ""); - - boolean liveTest = StringUtils.isNotEmpty(fsname); - URI testURI = null; - if (liveTest) { - testURI = URI.create(fsname); - liveTest = testURI.getScheme().equals(Constants.FS_OSS); - } - - if (!liveTest) { - throw new AssumptionViolatedException("No test filesystem in " - + TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME); - } - AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem(); - ossfs.initialize(testURI, conf); - return ossfs; - } - - /** - * Generate unique test path for multiple user tests. - * - * @return root test path - */ - public static String generateUniqueTestPath() { - String testUniqueForkId = System.getProperty("test.unique.fork.id"); - return testUniqueForkId == null ? "/test" : - "/" + testUniqueForkId + "/test"; - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java deleted file mode 100644 index e08a4dccfca..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import com.aliyun.oss.common.auth.Credentials; -import com.aliyun.oss.common.auth.InvalidCredentialsException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract; -import org.apache.hadoop.fs.contract.AbstractFSContract; -import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; -import org.junit.Test; - -import java.io.IOException; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID; -import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET; -import static org.apache.hadoop.fs.aliyun.oss.Constants.SECURITY_TOKEN; - -/** - * Tests use of temporary credentials (for example, Aliyun STS & Aliyun OSS). - * This test extends a class that "does things to the root directory", and - * should only be used against transient filesystems where you don't care about - * the data. - */ -public class TestAliyunCredentials extends AbstractFSContractTestBase { - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new AliyunOSSContract(conf); - } - - @Test - public void testCredentialMissingAccessKeyId() throws Throwable { - Configuration conf = new Configuration(); - conf.set(ACCESS_KEY_ID, ""); - conf.set(ACCESS_KEY_SECRET, "accessKeySecret"); - conf.set(SECURITY_TOKEN, "token"); - validateCredential(conf); - } - - @Test - public void testCredentialMissingAccessKeySecret() throws Throwable { - Configuration conf = new Configuration(); - conf.set(ACCESS_KEY_ID, "accessKeyId"); - conf.set(ACCESS_KEY_SECRET, ""); - conf.set(SECURITY_TOKEN, "token"); - validateCredential(conf); - } - - private void validateCredential(Configuration conf) { - try { - AliyunCredentialsProvider provider - = new AliyunCredentialsProvider(conf); - Credentials credentials = provider.getCredentials(); - fail("Expected a CredentialInitializationException, got " + credentials); - } catch (InvalidCredentialsException expected) { - // expected - } catch (IOException e) { - fail("Unexpected exception."); - } - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java deleted file mode 100644 index 19120a62e22..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystemContractBaseTest; -import org.apache.hadoop.fs.Path; - -import org.junit.Before; -import org.junit.Test; - -import java.io.FileNotFoundException; -import java.io.IOException; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeNotNull; -import static org.junit.Assume.assumeTrue; - -/** - * Tests a live Aliyun OSS system. - */ -public class TestAliyunOSSFileSystemContract - extends FileSystemContractBaseTest { - public static final String TEST_FS_OSS_NAME = "test.fs.oss.name"; - private static Path testRootPath = - new Path(AliyunOSSTestUtils.generateUniqueTestPath()); - - @Before - public void setUp() throws Exception { - Configuration conf = new Configuration(); - fs = AliyunOSSTestUtils.createTestFileSystem(conf); - assumeNotNull(fs); - } - - public Path getTestBaseDir() { - return testRootPath; - } - - @Test - public void testMkdirsWithUmask() throws Exception { - // not supported - } - - @Test - public void testRootDirAlwaysExists() throws Exception { - //this will throw an exception if the path is not found - fs.getFileStatus(super.path("/")); - //this catches overrides of the base exists() method that don't - //use getFileStatus() as an existence probe - assertTrue("FileSystem.exists() fails for root", - fs.exists(super.path("/"))); - } - - @Test - public void testRenameRootDirForbidden() throws Exception { - assumeTrue(renameSupported()); - rename(super.path("/"), - super.path("/test/newRootDir"), - false, true, false); - } - - @Test - public void testDeleteSubdir() throws IOException { - Path parentDir = this.path("/test/hadoop"); - Path file = this.path("/test/hadoop/file"); - Path subdir = this.path("/test/hadoop/subdir"); - this.createFile(file); - - assertTrue("Created subdir", this.fs.mkdirs(subdir)); - assertTrue("File exists", this.fs.exists(file)); - assertTrue("Parent dir exists", this.fs.exists(parentDir)); - assertTrue("Subdir exists", this.fs.exists(subdir)); - - assertTrue("Deleted subdir", this.fs.delete(subdir, true)); - assertTrue("Parent should exist", this.fs.exists(parentDir)); - - assertTrue("Deleted file", this.fs.delete(file, false)); - assertTrue("Parent should exist", this.fs.exists(parentDir)); - } - - - @Override - protected boolean renameSupported() { - return true; - } - - @Test - public void testRenameNonExistentPath() throws Exception { - assumeTrue(renameSupported()); - Path src = this.path("/test/hadoop/path"); - Path dst = this.path("/test/new/newpath"); - try { - super.rename(src, dst, false, false, false); - fail("Should throw FileNotFoundException!"); - } catch (FileNotFoundException e) { - // expected - } - } - - @Test - public void testRenameFileMoveToNonExistentDirectory() throws Exception { - assumeTrue(renameSupported()); - Path src = this.path("/test/hadoop/file"); - this.createFile(src); - Path dst = this.path("/test/new/newfile"); - try { - super.rename(src, dst, false, true, false); - fail("Should throw FileNotFoundException!"); - } catch (FileNotFoundException e) { - // expected - } - } - - @Test - public void testRenameDirectoryMoveToNonExistentDirectory() throws Exception { - assumeTrue(renameSupported()); - Path src = this.path("/test/hadoop/dir"); - this.fs.mkdirs(src); - Path dst = this.path("/test/new/newdir"); - try { - super.rename(src, dst, false, true, false); - fail("Should throw FileNotFoundException!"); - } catch (FileNotFoundException e) { - // expected - } - } - - @Test - public void testRenameFileMoveToExistingDirectory() throws Exception { - super.testRenameFileMoveToExistingDirectory(); - } - - @Test - public void testRenameFileAsExistingFile() throws Exception { - assumeTrue(renameSupported()); - Path src = this.path("/test/hadoop/file"); - this.createFile(src); - Path dst = this.path("/test/new/newfile"); - this.createFile(dst); - try { - super.rename(src, dst, false, true, true); - fail("Should throw FileAlreadyExistsException"); - } catch (FileAlreadyExistsException e) { - // expected - } - } - - @Test - public void testRenameDirectoryAsExistingFile() throws Exception { - assumeTrue(renameSupported()); - Path src = this.path("/test/hadoop/dir"); - this.fs.mkdirs(src); - Path dst = this.path("/test/new/newfile"); - this.createFile(dst); - try { - super.rename(src, dst, false, true, true); - fail("Should throw FileAlreadyExistsException"); - } catch (FileAlreadyExistsException e) { - // expected - } - } - - @Test - public void testGetFileStatusFileAndDirectory() throws Exception { - Path filePath = this.path("/test/oss/file1"); - this.createFile(filePath); - assertTrue("Should be file", this.fs.getFileStatus(filePath).isFile()); - assertFalse("Should not be directory", - this.fs.getFileStatus(filePath).isDirectory()); - - Path dirPath = this.path("/test/oss/dir"); - this.fs.mkdirs(dirPath); - assertTrue("Should be directory", - this.fs.getFileStatus(dirPath).isDirectory()); - assertFalse("Should not be file", this.fs.getFileStatus(dirPath).isFile()); - - Path parentPath = this.path("/test/oss"); - for (FileStatus fileStatus: fs.listStatus(parentPath)) { - assertTrue("file and directory should be new", - fileStatus.getModificationTime() > 0L); - } - } - - @Test - public void testMkdirsForExistingFile() throws Exception { - Path testFile = this.path("/test/hadoop/file"); - assertFalse(this.fs.exists(testFile)); - this.createFile(testFile); - assertTrue(this.fs.exists(testFile)); - try { - this.fs.mkdirs(testFile); - fail("Should throw FileAlreadyExistsException!"); - } catch (FileAlreadyExistsException e) { - // expected - } - } - -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java deleted file mode 100644 index 7f4bac25642..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.security.DigestInputStream; -import java.security.DigestOutputStream; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeNotNull; - -/** - * Test the bridging logic between Hadoop's abstract filesystem and - * Aliyun OSS. - */ -public class TestAliyunOSSFileSystemStore { - private Configuration conf; - private AliyunOSSFileSystemStore store; - private AliyunOSSFileSystem fs; - - @Before - public void setUp() throws Exception { - conf = new Configuration(); - fs = new AliyunOSSFileSystem(); - fs.initialize(URI.create(conf.get("test.fs.oss.name")), conf); - store = fs.getStore(); - } - - @After - public void tearDown() throws Exception { - try { - store.purge("test"); - } catch (Exception e) { - e.printStackTrace(); - throw e; - } - } - - @BeforeClass - public static void checkSettings() throws Exception { - Configuration conf = new Configuration(); - assumeNotNull(conf.get(Constants.ACCESS_KEY_ID)); - assumeNotNull(conf.get(Constants.ACCESS_KEY_SECRET)); - assumeNotNull(conf.get("test.fs.oss.name")); - } - - protected void writeRenameReadCompare(Path path, long len) - throws IOException, NoSuchAlgorithmException { - // If len > fs.oss.multipart.upload.threshold, - // we'll use a multipart upload copy - MessageDigest digest = MessageDigest.getInstance("MD5"); - OutputStream out = new BufferedOutputStream( - new DigestOutputStream(fs.create(path, false), digest)); - for (long i = 0; i < len; i++) { - out.write('Q'); - } - out.flush(); - out.close(); - - assertTrue("Exists", fs.exists(path)); - - Path copyPath = path.suffix(".copy"); - fs.rename(path, copyPath); - - assertTrue("Copy exists", fs.exists(copyPath)); - - // Download file from Aliyun OSS and compare the digest against the original - MessageDigest digest2 = MessageDigest.getInstance("MD5"); - InputStream in = new BufferedInputStream( - new DigestInputStream(fs.open(copyPath), digest2)); - long copyLen = 0; - while (in.read() != -1) { - copyLen++; - } - in.close(); - - assertEquals("Copy length matches original", len, copyLen); - assertArrayEquals("Digests match", digest.digest(), digest2.digest()); - } - - @Test - public void testSmallUpload() throws IOException, NoSuchAlgorithmException { - // Regular upload, regular copy - writeRenameReadCompare(new Path("/test/small"), 16384); - } - - @Test - public void testLargeUpload() - throws IOException, NoSuchAlgorithmException { - // Multipart upload, multipart copy - writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java deleted file mode 100644 index d798cafb206..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.fs.FileStatus; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Random; - -import static org.junit.Assert.assertTrue; - -/** - * Tests basic functionality for AliyunOSSInputStream, including seeking and - * reading files. - */ -public class TestAliyunOSSInputStream { - - private FileSystem fs; - - private static final Logger LOG = - LoggerFactory.getLogger(TestAliyunOSSInputStream.class); - - private static String testRootPath = - AliyunOSSTestUtils.generateUniqueTestPath(); - - @Rule - public Timeout testTimeout = new Timeout(30 * 60 * 1000); - - @Before - public void setUp() throws Exception { - Configuration conf = new Configuration(); - fs = AliyunOSSTestUtils.createTestFileSystem(conf); - } - - @After - public void tearDown() throws Exception { - if (fs != null) { - fs.delete(new Path(testRootPath), true); - } - } - - private Path setPath(String path) { - if (path.startsWith("/")) { - return new Path(testRootPath + path); - } else { - return new Path(testRootPath + "/" + path); - } - } - - @Test - public void testSeekFile() throws Exception { - Path smallSeekFile = setPath("/test/smallSeekFile.txt"); - long size = 5 * 1024 * 1024; - - ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255); - LOG.info("5MB file created: smallSeekFile.txt"); - - FSDataInputStream instream = this.fs.open(smallSeekFile); - int seekTimes = 5; - LOG.info("multiple fold position seeking test...:"); - for (int i = 0; i < seekTimes; i++) { - long pos = size / (seekTimes - i) - 1; - LOG.info("begin seeking for pos: " + pos); - instream.seek(pos); - assertTrue("expected position at:" + pos + ", but got:" - + instream.getPos(), instream.getPos() == pos); - LOG.info("completed seeking at pos: " + instream.getPos()); - } - LOG.info("random position seeking test...:"); - Random rand = new Random(); - for (int i = 0; i < seekTimes; i++) { - long pos = Math.abs(rand.nextLong()) % size; - LOG.info("begin seeking for pos: " + pos); - instream.seek(pos); - assertTrue("expected position at:" + pos + ", but got:" - + instream.getPos(), instream.getPos() == pos); - LOG.info("completed seeking at pos: " + instream.getPos()); - } - IOUtils.closeStream(instream); - } - - @Test - public void testReadFile() throws Exception { - final int bufLen = 256; - final int sizeFlag = 5; - String filename = "readTestFile_" + sizeFlag + ".txt"; - Path readTestFile = setPath("/test/" + filename); - long size = sizeFlag * 1024 * 1024; - - ContractTestUtils.generateTestFile(this.fs, readTestFile, size, 256, 255); - LOG.info(sizeFlag + "MB file created: /test/" + filename); - - FSDataInputStream instream = this.fs.open(readTestFile); - byte[] buf = new byte[bufLen]; - long bytesRead = 0; - while (bytesRead < size) { - int bytes; - if (size - bytesRead < bufLen) { - int remaining = (int)(size - bytesRead); - bytes = instream.read(buf, 0, remaining); - } else { - bytes = instream.read(buf, 0, bufLen); - } - bytesRead += bytes; - - if (bytesRead % (1024 * 1024) == 0) { - int available = instream.available(); - int remaining = (int)(size - bytesRead); - assertTrue("expected remaining:" + remaining + ", but got:" + available, - remaining == available); - LOG.info("Bytes read: " + Math.round((double)bytesRead / (1024 * 1024)) - + " MB"); - } - } - assertTrue(instream.available() == 0); - IOUtils.closeStream(instream); - } - - @Test - public void testDirectoryModifiedTime() throws Exception { - Path emptyDirPath = setPath("/test/emptyDirectory"); - fs.mkdirs(emptyDirPath); - FileStatus dirFileStatus = fs.getFileStatus(emptyDirPath); - assertTrue("expected the empty dir is new", - dirFileStatus.getModificationTime() > 0L); - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java deleted file mode 100644 index 6b87d9ca466..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; - -import java.io.IOException; - -/** - * Tests regular and multi-part upload functionality for AliyunOSSOutputStream. - */ -public class TestAliyunOSSOutputStream { - private FileSystem fs; - private static String testRootPath = - AliyunOSSTestUtils.generateUniqueTestPath(); - - @Rule - public Timeout testTimeout = new Timeout(30 * 60 * 1000); - - @Before - public void setUp() throws Exception { - Configuration conf = new Configuration(); - conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024); - conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024); - fs = AliyunOSSTestUtils.createTestFileSystem(conf); - } - - @After - public void tearDown() throws Exception { - if (fs != null) { - fs.delete(new Path(testRootPath), true); - } - } - - protected Path getTestPath() { - return new Path(testRootPath + "/test-aliyun-oss"); - } - - @Test - public void testRegularUpload() throws IOException { - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024); - } - - @Test - public void testMultiPartUpload() throws IOException { - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024); - } - - @Test - public void testMultiPartUploadLimit() throws IOException { - long partSize1 = AliyunOSSUtils.calculatePartSize(10 * 1024, 100 * 1024); - assert(10 * 1024 / partSize1 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); - - long partSize2 = AliyunOSSUtils.calculatePartSize(200 * 1024, 100 * 1024); - assert(200 * 1024 / partSize2 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); - - long partSize3 = AliyunOSSUtils.calculatePartSize(10000 * 100 * 1024, - 100 * 1024); - assert(10000 * 100 * 1024 / partSize3 - < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); - - long partSize4 = AliyunOSSUtils.calculatePartSize(10001 * 100 * 1024, - 100 * 1024); - assert(10001 * 100 * 1024 / partSize4 - < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/AliyunOSSContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/AliyunOSSContract.java deleted file mode 100644 index 624c606c6b8..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/AliyunOSSContract.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.AbstractBondedFSContract; - -/** - * The contract of Aliyun OSS: only enabled if the test bucket is provided. - */ -public class AliyunOSSContract extends AbstractBondedFSContract { - - public static final String CONTRACT_XML = "contract/aliyun-oss.xml"; - - public AliyunOSSContract(Configuration conf) { - super(conf); - //insert the base features - addConfResource(CONTRACT_XML); - } - - @Override - public String getScheme() { - return "oss"; - } - - @Override - public Path getTestPath() { - String testUniqueForkId = System.getProperty("test.unique.fork.id"); - return testUniqueForkId == null ? super.getTestPath() : - new Path("/" + testUniqueForkId, "test"); - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractCreate.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractCreate.java deleted file mode 100644 index 88dd8cd2267..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractCreate.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractCreateTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; - -/** - * Aliyun OSS contract creating tests. - */ -public class TestAliyunOSSContractCreate extends AbstractContractCreateTest { - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new AliyunOSSContract(conf); - } - -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDelete.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDelete.java deleted file mode 100644 index 1658d806831..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDelete.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractDeleteTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; - -/** - * Aliyun OSS contract deleting tests. - */ -public class TestAliyunOSSContractDelete extends AbstractContractDeleteTest { - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new AliyunOSSContract(conf); - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java deleted file mode 100644 index 18d09d54149..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.aliyun.oss.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -/** - * Contract test suite covering Aliyun OSS integration with DistCp. - */ -public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest { - - private static final long MULTIPART_SETTING = 8 * 1024 * 1024; // 8 MB - - @Override - protected Configuration createConfiguration() { - Configuration newConf = super.createConfiguration(); - newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING); - newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING); - return newConf; - } - - @Override - protected AliyunOSSContract createContract(Configuration conf) { - return new AliyunOSSContract(conf); - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractGetFileStatus.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractGetFileStatus.java deleted file mode 100644 index c69124d9243..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractGetFileStatus.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.aliyun.oss.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; - -/** - * Test getFileStatus and related listing operations. - */ -public class TestAliyunOSSContractGetFileStatus - extends AbstractContractGetFileStatusTest { - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new AliyunOSSContract(conf); - } - -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractMkdir.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractMkdir.java deleted file mode 100644 index 6cb754975ca..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractMkdir.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; - -/** - * Aliyun OSS contract directory tests. - */ -public class TestAliyunOSSContractMkdir extends AbstractContractMkdirTest { - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new AliyunOSSContract(conf); - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractOpen.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractOpen.java deleted file mode 100644 index 099aba6296f..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractOpen.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractOpenTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; - -/** - * Aliyun OSS contract opening file tests. - */ -public class TestAliyunOSSContractOpen extends AbstractContractOpenTest { - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new AliyunOSSContract(conf); - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRename.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRename.java deleted file mode 100644 index e15b3ba30de..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRename.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractRenameTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; - -/** - * Aliyun OSS contract renaming tests. - */ -public class TestAliyunOSSContractRename extends AbstractContractRenameTest { - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new AliyunOSSContract(conf); - } - -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRootDir.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRootDir.java deleted file mode 100644 index 9faae374521..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractRootDir.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.aliyun.oss.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.IOException; - -/** - * Root dir operations against an Aliyun OSS bucket. - */ -public class TestAliyunOSSContractRootDir extends - AbstractContractRootDirectoryTest { - - private static final Logger LOG = - LoggerFactory.getLogger(TestAliyunOSSContractRootDir.class); - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new AliyunOSSContract(conf); - } - - @Override - public void testListEmptyRootDirectory() throws IOException { - for (int attempt = 1, maxAttempts = 10; attempt <= maxAttempts; ++attempt) { - try { - super.testListEmptyRootDirectory(); - break; - } catch (AssertionError | FileNotFoundException e) { - if (attempt < maxAttempts) { - LOG.info("Attempt {} of {} for empty root directory test failed. " - + "Attempting retry.", attempt, maxAttempts); - try { - Thread.sleep(1000); - } catch (InterruptedException e2) { - Thread.currentThread().interrupt(); - fail("Test interrupted."); - break; - } - } else { - LOG.error( - "Empty root directory test failed {} attempts. Failing test.", - maxAttempts); - throw e; - } - } - } - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractSeek.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractSeek.java deleted file mode 100644 index d9b367440a2..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractSeek.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss.contract; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.AbstractContractSeekTest; -import org.apache.hadoop.fs.contract.AbstractFSContract; -import org.junit.Test; - -import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; -import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; - -/** - * Aliyun OSS contract seeking tests. - */ -public class TestAliyunOSSContractSeek extends AbstractContractSeekTest { - - @Override - protected AbstractFSContract createContract(Configuration conf) { - return new AliyunOSSContract(conf); - } - - @Test - public void testSeekBeyondDownloadSize() throws Throwable { - describe("seek and read beyond download size."); - - Path byteFile = path("byte_file.txt"); - // 'fs.oss.multipart.download.size' = 100 * 1024 - byte[] block = dataset(100 * 1024 + 10, 0, 255); - FileSystem fs = getFileSystem(); - createFile(fs, byteFile, true, block); - - FSDataInputStream instream = getFileSystem().open(byteFile); - instream.seek(100 * 1024 - 1); - assertEquals(100 * 1024 - 1, instream.getPos()); - assertEquals(144, instream.read()); - instream.seek(100 * 1024 + 1); - assertEquals(100 * 1024 + 1, instream.getPos()); - assertEquals(146, instream.read()); - } -} diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml b/hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml deleted file mode 100644 index 9ec4be6c8af..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml +++ /dev/null @@ -1,120 +0,0 @@ - - - - - - fs.contract.test.random-seek-count - 10 - - - - fs.contract.is-blobstore - true - - - - fs.contract.is-case-sensitive - true - - - - fs.contract.rename-returns-false-if-source-missing - false - - - - fs.contract.rename-remove-dest-if-empty-dir - false - - - - fs.contract.supports-append - false - - - - fs.contract.supports-atomic-directory-delete - false - - - - fs.contract.supports-atomic-rename - false - - - - fs.contract.supports-block-locality - false - - - - fs.contract.supports-concat - false - - - - fs.contract.supports-seek - true - - - - fs.contract.supports-seek-on-closed-file - true - - - - fs.contract.rejects-seek-past-eof - true - - - - fs.contract.supports-strict-exceptions - true - - - - fs.contract.supports-unix-permissions - false - - - - fs.contract.rename-overwrites-dest - true - - - - fs.contract.test.root-tests-enabled - true - - - - fs.contract.supports-getfilestatus - true - - - - fs.oss.multipart.download.size - 102400 - - - - fs.contract.create-visibility-delayed - true - - diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml b/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml deleted file mode 100644 index fa4118c2162..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/resources/core-site.xml +++ /dev/null @@ -1,46 +0,0 @@ - - - - - - - hadoop.tmp.dir - target/build/test - A base for other temporary directories. - true - - - - - hadoop.security.authentication - simple - - - - - - - - diff --git a/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties deleted file mode 100644 index bb5cbe5ec32..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/test/resources/log4j.properties +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# log4j configuration used during build and unit tests - -log4j.rootLogger=INFO,stdout -log4j.threshold=ALL -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml index 3af7aee392f..3ecc51b1421 100644 --- a/hadoop-tools/hadoop-tools-dist/pom.xml +++ b/hadoop-tools/hadoop-tools-dist/pom.xml @@ -100,12 +100,6 @@ compile ${project.version} - - org.apache.hadoop - hadoop-aliyun - compile - ${project.version} - org.apache.hadoop hadoop-sls diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml index 117d12c0914..e307a400926 100644 --- a/hadoop-tools/pom.xml +++ b/hadoop-tools/pom.xml @@ -48,7 +48,6 @@ hadoop-aws hadoop-azure hadoop-azure-datalake - hadoop-aliyun