Merge branch 'HADOOP-12756' into trunk

This commit is contained in:
Kai Zheng 2016-10-11 03:22:11 +06:00
commit 669d6f13ec
34 changed files with 3695 additions and 0 deletions

2
.gitignore vendored
View File

@ -31,3 +31,5 @@ hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml
hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml
hadoop-tools/hadoop-azure/src/test/resources/azure-auth-keys.xml
patchprocess/
hadoop-tools/hadoop-aliyun/src/test/resources/auth-keys.xml
hadoop-tools/hadoop-aliyun/src/test/resources/contract-test-options.xml

View File

@ -437,6 +437,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-kms</artifactId>
@ -999,6 +1005,22 @@
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>2.2.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>

View File

@ -0,0 +1,18 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
</FindBugsFilter>

View File

@ -0,0 +1,154 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>3.0.0-alpha2-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<artifactId>hadoop-aliyun</artifactId>
<name>Apache Hadoop Aliyun OSS support</name>
<packaging>jar</packaging>
<properties>
<file.encoding>UTF-8</file.encoding>
<downloadSources>true</downloadSources>
</properties>
<profiles>
<profile>
<id>tests-off</id>
<activation>
<file>
<missing>src/test/resources/auth-keys.xml</missing>
</file>
</activation>
<properties>
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
<profile>
<id>tests-on</id>
<activation>
<file>
<exists>src/test/resources/auth-keys.xml</exists>
</file>
</activation>
<properties>
<maven.test.skip>false</maven.test.skip>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<findbugsXmlOutput>true</findbugsXmlOutput>
<xmlOutput>true</xmlOutput>
<excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
</excludeFilterFile>
<effort>Max</effort>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<configuration>
<dependencyDetailsEnabled>false</dependencyDetailsEnabled>
<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>deplist</id>
<phase>compile</phase>
<goals>
<goal>list</goal>
</goals>
<configuration>
<!-- build a shellprofile -->
<outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.DefaultCredentials;
import com.aliyun.oss.common.auth.InvalidCredentialsException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
* Support session credentials for authenticating with Aliyun.
*/
public class AliyunCredentialsProvider implements CredentialsProvider {
private Credentials credentials = null;
public AliyunCredentialsProvider(Configuration conf)
throws IOException {
String accessKeyId;
String accessKeySecret;
String securityToken;
try {
accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID);
accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET);
} catch (IOException e) {
throw new InvalidCredentialsException(e);
}
try {
securityToken = AliyunOSSUtils.getValueWithKey(conf, SECURITY_TOKEN);
} catch (IOException e) {
securityToken = null;
}
if (StringUtils.isEmpty(accessKeyId)
|| StringUtils.isEmpty(accessKeySecret)) {
throw new InvalidCredentialsException(
"AccessKeyId and AccessKeySecret should not be null or empty.");
}
if (StringUtils.isNotEmpty(securityToken)) {
credentials = new DefaultCredentials(accessKeyId, accessKeySecret,
securityToken);
} else {
credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
}
}
@Override
public void setCredentials(Credentials creds) {
if (creds == null) {
throw new InvalidCredentialsException("Credentials should not be null.");
}
credentials = creds;
}
@Override
public Credentials getCredentials() {
if (credentials == null) {
throw new InvalidCredentialsException("Invalid credentials");
}
return credentials;
}
}

View File

@ -0,0 +1,580 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
* Implementation of {@link FileSystem} for <a href="https://oss.aliyun.com">
* Aliyun OSS</a>, used to access OSS blob system in a filesystem style.
*/
public class AliyunOSSFileSystem extends FileSystem {
private static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSFileSystem.class);
private URI uri;
private String bucket;
private Path workingDir;
private AliyunOSSFileSystemStore store;
private int maxKeys;
@Override
public FSDataOutputStream append(Path path, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Append is not supported!");
}
@Override
public void close() throws IOException {
try {
store.close();
} finally {
super.close();
}
}
@Override
public FSDataOutputStream create(Path path, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
String key = pathToKey(path);
FileStatus status = null;
try {
// get the status or throw a FNFE
status = getFileStatus(path);
// if the thread reaches here, there is something at the path
if (status.isDirectory()) {
// path references a directory
throw new FileAlreadyExistsException(path + " is a directory");
}
if (!overwrite) {
// path references a file and overwrite is disabled
throw new FileAlreadyExistsException(path + " already exists");
}
LOG.debug("Overwriting file {}", path);
} catch (FileNotFoundException e) {
// this means the file is not found
}
return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(),
store, key, progress, statistics), (Statistics)(null));
}
@Override
public boolean delete(Path path, boolean recursive) throws IOException {
try {
return innerDelete(getFileStatus(path), recursive);
} catch (FileNotFoundException e) {
LOG.debug("Couldn't delete {} - does not exist", path);
return false;
}
}
/**
* Delete an object. See {@link #delete(Path, boolean)}.
*
* @param status fileStatus object
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if delete is successful else false.
* @throws IOException due to inability to delete a directory or file.
*/
private boolean innerDelete(FileStatus status, boolean recursive)
throws IOException {
Path f = status.getPath();
String p = f.toUri().getPath();
FileStatus[] statuses;
// indicating root directory "/".
if (p.equals("/")) {
statuses = listStatus(status.getPath());
boolean isEmptyDir = statuses.length <= 0;
return rejectRootDirectoryDelete(isEmptyDir, recursive);
}
String key = pathToKey(f);
if (status.isDirectory()) {
if (!recursive) {
// Check whether it is an empty directory or not
statuses = listStatus(status.getPath());
if (statuses.length > 0) {
throw new IOException("Cannot remove directory " + f +
": It is not empty!");
} else {
// Delete empty directory without '-r'
key = AliyunOSSUtils.maybeAddTrailingSlash(key);
store.deleteObject(key);
}
} else {
store.deleteDirs(key);
}
} else {
store.deleteObject(key);
}
createFakeDirectoryIfNecessary(f);
return true;
}
/**
* Implements the specific logic to reject root directory deletion.
* The caller must return the result of this call, rather than
* attempt to continue with the delete operation: deleting root
* directories is never allowed. This method simply implements
* the policy of when to return an exit code versus raise an exception.
* @param isEmptyDir empty directory or not
* @param recursive recursive flag from command
* @return a return code for the operation
* @throws PathIOException if the operation was explicitly rejected.
*/
private boolean rejectRootDirectoryDelete(boolean isEmptyDir,
boolean recursive) throws IOException {
LOG.info("oss delete the {} root directory of {}", bucket, recursive);
if (isEmptyDir) {
return true;
}
if (recursive) {
return false;
} else {
// reject
throw new PathIOException(bucket, "Cannot delete root path");
}
}
private void createFakeDirectoryIfNecessary(Path f) throws IOException {
String key = pathToKey(f);
if (StringUtils.isNotEmpty(key) && !exists(f)) {
LOG.debug("Creating new fake directory at {}", f);
mkdir(pathToKey(f.getParent()));
}
}
@Override
public FileStatus getFileStatus(Path path) throws IOException {
Path qualifiedPath = path.makeQualified(uri, workingDir);
String key = pathToKey(qualifiedPath);
// Root always exists
if (key.length() == 0) {
return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
}
ObjectMetadata meta = store.getObjectMetadata(key);
// If key not found and key does not end with "/"
if (meta == null && !key.endsWith("/")) {
// In case of 'dir + "/"'
key += "/";
meta = store.getObjectMetadata(key);
}
if (meta == null) {
ObjectListing listing = store.listObjects(key, 1, null, false);
if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
} else {
throw new FileNotFoundException(path + ": No such file or directory!");
}
} else if (objectRepresentsDirectory(key, meta.getContentLength())) {
return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
} else {
return new FileStatus(meta.getContentLength(), false, 1,
getDefaultBlockSize(path), meta.getLastModified().getTime(),
qualifiedPath);
}
}
@Override
public String getScheme() {
return "oss";
}
@Override
public URI getUri() {
return uri;
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Deprecated
public long getDefaultBlockSize() {
return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT);
}
@Override
public String getCanonicalServiceName() {
// Does not support Token
return null;
}
/**
* Initialize new FileSystem.
*
* @param name the uri of the file system, including host, port, etc.
* @param conf configuration of the file system
* @throws IOException IO problems
*/
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
bucket = name.getHost();
uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
workingDir = new Path("/user",
System.getProperty("user.name")).makeQualified(uri, null);
store = new AliyunOSSFileSystemStore();
store.initialize(name, conf, statistics);
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
setConf(conf);
}
/**
* Check if OSS object represents a directory.
*
* @param name object key
* @param size object content length
* @return true if object represents a directory
*/
private boolean objectRepresentsDirectory(final String name,
final long size) {
return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
}
/**
* Turn a path (relative or otherwise) into an OSS key.
*
* @param path the path of the file.
* @return the key of the object that represents the file.
*/
private String pathToKey(Path path) {
if (!path.isAbsolute()) {
path = new Path(workingDir, path);
}
return path.toUri().getPath().substring(1);
}
private Path keyToPath(String key) {
return new Path("/" + key);
}
@Override
public FileStatus[] listStatus(Path path) throws IOException {
String key = pathToKey(path);
if (LOG.isDebugEnabled()) {
LOG.debug("List status for path: " + path);
}
final List<FileStatus> result = new ArrayList<FileStatus>();
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: doing listObjects for directory " + key);
}
ObjectListing objects = store.listObjects(key, maxKeys, null, false);
while (true) {
statistics.incrementReadOps(1);
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String objKey = objectSummary.getKey();
if (objKey.equals(key + "/")) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + objKey);
}
continue;
} else {
Path keyPath = keyToPath(objectSummary.getKey())
.makeQualified(uri, workingDir);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fi: " + keyPath);
}
result.add(new FileStatus(objectSummary.getSize(), false, 1,
getDefaultBlockSize(keyPath),
objectSummary.getLastModified().getTime(), keyPath));
}
}
for (String prefix : objects.getCommonPrefixes()) {
if (prefix.equals(key + "/")) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + prefix);
}
continue;
} else {
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd: " + keyPath);
}
result.add(new FileStatus(0, true, 1, 0, 0, keyPath));
}
}
if (objects.isTruncated()) {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch");
}
String nextMarker = objects.getNextMarker();
objects = store.listObjects(key, maxKeys, nextMarker, false);
statistics.incrementReadOps(1);
} else {
break;
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd (not a dir): " + path);
}
result.add(fileStatus);
}
return result.toArray(new FileStatus[result.size()]);
}
/**
* Used to create an empty file that represents an empty directory.
*
* @param key directory path
* @return true if directory is successfully created
* @throws IOException
*/
private boolean mkdir(final String key) throws IOException {
String dirName = key;
if (StringUtils.isNotEmpty(key)) {
if (!key.endsWith("/")) {
dirName += "/";
}
store.storeEmptyFile(dirName);
}
return true;
}
@Override
public boolean mkdirs(Path path, FsPermission permission)
throws IOException {
try {
FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
return true;
} else {
throw new FileAlreadyExistsException("Path is a file: " + path);
}
} catch (FileNotFoundException e) {
validatePath(path);
String key = pathToKey(path);
return mkdir(key);
}
}
/**
* Check whether the path is a valid path.
*
* @param path the path to be checked.
* @throws IOException
*/
private void validatePath(Path path) throws IOException {
Path fPart = path.getParent();
do {
try {
FileStatus fileStatus = getFileStatus(fPart);
if (fileStatus.isDirectory()) {
// If path exists and a directory, exit
break;
} else {
throw new FileAlreadyExistsException(String.format(
"Can't make directory for path '%s', it is a file.", fPart));
}
} catch (FileNotFoundException fnfe) {
}
fPart = fPart.getParent();
} while (fPart != null);
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open " + path +
" because it is a directory");
}
return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
pathToKey(path), fileStatus.getLen(), statistics));
}
@Override
public boolean rename(Path srcPath, Path dstPath) throws IOException {
if (srcPath.isRoot()) {
// Cannot rename root of file system
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot rename the root of a filesystem");
}
return false;
}
Path parent = dstPath.getParent();
while (parent != null && !srcPath.equals(parent)) {
parent = parent.getParent();
}
if (parent != null) {
return false;
}
FileStatus srcStatus = getFileStatus(srcPath);
FileStatus dstStatus;
try {
dstStatus = getFileStatus(dstPath);
} catch (FileNotFoundException fnde) {
dstStatus = null;
}
if (dstStatus == null) {
// If dst doesn't exist, check whether dst dir exists or not
dstStatus = getFileStatus(dstPath.getParent());
if (!dstStatus.isDirectory()) {
throw new IOException(String.format(
"Failed to rename %s to %s, %s is a file", srcPath, dstPath,
dstPath.getParent()));
}
} else {
if (srcStatus.getPath().equals(dstStatus.getPath())) {
return !srcStatus.isDirectory();
} else if (dstStatus.isDirectory()) {
// If dst is a directory
dstPath = new Path(dstPath, srcPath.getName());
FileStatus[] statuses;
try {
statuses = listStatus(dstPath);
} catch (FileNotFoundException fnde) {
statuses = null;
}
if (statuses != null && statuses.length > 0) {
// If dst exists and not a directory / not empty
throw new FileAlreadyExistsException(String.format(
"Failed to rename %s to %s, file already exists or not empty!",
srcPath, dstPath));
}
} else {
// If dst is not a directory
throw new FileAlreadyExistsException(String.format(
"Failed to rename %s to %s, file already exists!", srcPath,
dstPath));
}
}
if (srcStatus.isDirectory()) {
copyDirectory(srcPath, dstPath);
} else {
copyFile(srcPath, dstPath);
}
return srcPath.equals(dstPath) || delete(srcPath, true);
}
/**
* Copy file from source path to destination path.
* (the caller should make sure srcPath is a file and dstPath is valid)
*
* @param srcPath source path.
* @param dstPath destination path.
* @return true if file is successfully copied.
*/
private boolean copyFile(Path srcPath, Path dstPath) {
String srcKey = pathToKey(srcPath);
String dstKey = pathToKey(dstPath);
return store.copyFile(srcKey, dstKey);
}
/**
* Copy a directory from source path to destination path.
* (the caller should make sure srcPath is a directory, and dstPath is valid)
*
* @param srcPath source path.
* @param dstPath destination path.
* @return true if directory is successfully copied.
*/
private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
String srcKey = AliyunOSSUtils
.maybeAddTrailingSlash(pathToKey(srcPath));
String dstKey = AliyunOSSUtils
.maybeAddTrailingSlash(pathToKey(dstPath));
if (dstKey.startsWith(srcKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot rename a directory to a subdirectory of self");
}
return false;
}
store.storeEmptyFile(dstKey);
ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
statistics.incrementReadOps(1);
// Copy files from src folder to dst
while (true) {
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String newKey =
dstKey.concat(objectSummary.getKey().substring(srcKey.length()));
store.copyFile(objectSummary.getKey(), newKey);
}
if (objects.isTruncated()) {
String nextMarker = objects.getNextMarker();
objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
statistics.incrementReadOps(1);
} else {
break;
}
}
return true;
}
@Override
public void setWorkingDirectory(Path dir) {
this.workingDir = dir;
}
public AliyunOSSFileSystemStore getStore() {
return store;
}
}

View File

@ -0,0 +1,516 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.ClientConfiguration;
import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.comm.Protocol;
import com.aliyun.oss.model.AbortMultipartUploadRequest;
import com.aliyun.oss.model.CannedAccessControlList;
import com.aliyun.oss.model.CompleteMultipartUploadRequest;
import com.aliyun.oss.model.CompleteMultipartUploadResult;
import com.aliyun.oss.model.CopyObjectResult;
import com.aliyun.oss.model.DeleteObjectsRequest;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadResult;
import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.PutObjectResult;
import com.aliyun.oss.model.UploadPartCopyRequest;
import com.aliyun.oss.model.UploadPartCopyResult;
import com.aliyun.oss.model.UploadPartRequest;
import com.aliyun.oss.model.UploadPartResult;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
* Core implementation of Aliyun OSS Filesystem for Hadoop.
* Provides the bridging logic between Hadoop's abstract filesystem and
* Aliyun OSS.
*/
public class AliyunOSSFileSystemStore {
public static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSFileSystemStore.class);
private FileSystem.Statistics statistics;
private OSSClient ossClient;
private String bucketName;
private long uploadPartSize;
private long multipartThreshold;
private long partSize;
private int maxKeys;
private String serverSideEncryptionAlgorithm;
public void initialize(URI uri, Configuration conf,
FileSystem.Statistics stat) throws IOException {
statistics = stat;
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
MAXIMUM_CONNECTIONS_DEFAULT));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY,
SECURE_CONNECTIONS_DEFAULT);
clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY,
MAX_ERROR_RETRIES_DEFAULT));
clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY,
ESTABLISH_TIMEOUT_DEFAULT));
clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY,
SOCKET_TIMEOUT_DEFAULT));
String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
if (StringUtils.isNotEmpty(proxyHost)) {
clientConf.setProxyHost(proxyHost);
if (proxyPort >= 0) {
clientConf.setProxyPort(proxyPort);
} else {
if (secureConnections) {
LOG.warn("Proxy host set without port. Using HTTPS default 443");
clientConf.setProxyPort(443);
} else {
LOG.warn("Proxy host set without port. Using HTTP default 80");
clientConf.setProxyPort(80);
}
}
String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY);
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY);
if ((proxyUsername == null) != (proxyPassword == null)) {
String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " +
PROXY_PASSWORD_KEY + " set without the other.";
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
clientConf.setProxyUsername(proxyUsername);
clientConf.setProxyPassword(proxyPassword);
clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY));
clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY));
} else if (proxyPort >= 0) {
String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " +
PROXY_HOST_KEY;
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
CredentialsProvider provider =
AliyunOSSUtils.getCredentialsProvider(conf);
ossClient = new OSSClient(endPoint, provider, clientConf);
uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
MULTIPART_UPLOAD_SIZE_DEFAULT);
multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
MULTIPART_UPLOAD_SIZE_DEFAULT);
if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
}
serverSideEncryptionAlgorithm =
conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
if (uploadPartSize < 5 * 1024 * 1024) {
LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
uploadPartSize = 5 * 1024 * 1024;
}
if (multipartThreshold < 5 * 1024 * 1024) {
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
multipartThreshold = 5 * 1024 * 1024;
}
if (multipartThreshold > 1024 * 1024 * 1024) {
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
multipartThreshold = 1024 * 1024 * 1024;
}
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
if (StringUtils.isNotEmpty(cannedACLName)) {
CannedAccessControlList cannedACL =
CannedAccessControlList.valueOf(cannedACLName);
ossClient.setBucketAcl(bucketName, cannedACL);
}
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
bucketName = uri.getHost();
}
/**
* Delete an object, and update write operation statistics.
*
* @param key key to blob to delete.
*/
public void deleteObject(String key) {
ossClient.deleteObject(bucketName, key);
statistics.incrementWriteOps(1);
}
/**
* Delete a list of keys, and update write operation statistics.
*
* @param keysToDelete collection of keys to delete.
*/
public void deleteObjects(List<String> keysToDelete) {
if (CollectionUtils.isNotEmpty(keysToDelete)) {
DeleteObjectsRequest deleteRequest =
new DeleteObjectsRequest(bucketName);
deleteRequest.setKeys(keysToDelete);
ossClient.deleteObjects(deleteRequest);
statistics.incrementWriteOps(keysToDelete.size());
}
}
/**
* Delete a directory from Aliyun OSS.
*
* @param key directory key to delete.
*/
public void deleteDirs(String key) {
key = AliyunOSSUtils.maybeAddTrailingSlash(key);
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(key);
listRequest.setDelimiter(null);
listRequest.setMaxKeys(maxKeys);
while (true) {
ObjectListing objects = ossClient.listObjects(listRequest);
statistics.incrementReadOps(1);
List<String> keysToDelete = new ArrayList<String>();
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
keysToDelete.add(objectSummary.getKey());
}
deleteObjects(keysToDelete);
if (objects.isTruncated()) {
listRequest.setMarker(objects.getNextMarker());
} else {
break;
}
}
}
/**
* Return metadata of a given object key.
*
* @param key object key.
* @return return null if key does not exist.
*/
public ObjectMetadata getObjectMetadata(String key) {
try {
return ossClient.getObjectMetadata(bucketName, key);
} catch (OSSException osse) {
return null;
} finally {
statistics.incrementReadOps(1);
}
}
/**
* Upload an empty file as an OSS object, using single upload.
*
* @param key object key.
* @throws IOException if failed to upload object.
*/
public void storeEmptyFile(String key) throws IOException {
ObjectMetadata dirMeta = new ObjectMetadata();
byte[] buffer = new byte[0];
ByteArrayInputStream in = new ByteArrayInputStream(buffer);
dirMeta.setContentLength(0);
try {
ossClient.putObject(bucketName, key, in, dirMeta);
} finally {
in.close();
}
}
/**
* Copy an object from source key to destination key.
*
* @param srcKey source key.
* @param dstKey destination key.
* @return true if file is successfully copied.
*/
public boolean copyFile(String srcKey, String dstKey) {
ObjectMetadata objectMeta =
ossClient.getObjectMetadata(bucketName, srcKey);
long contentLength = objectMeta.getContentLength();
if (contentLength <= multipartThreshold) {
return singleCopy(srcKey, dstKey);
} else {
return multipartCopy(srcKey, contentLength, dstKey);
}
}
/**
* Use single copy to copy an OSS object.
* (The caller should make sure srcPath is a file and dstPath is valid)
*
* @param srcKey source key.
* @param dstKey destination key.
* @return true if object is successfully copied.
*/
private boolean singleCopy(String srcKey, String dstKey) {
CopyObjectResult copyResult =
ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
LOG.debug(copyResult.getETag());
return true;
}
/**
* Use multipart copy to copy an OSS object.
* (The caller should make sure srcPath is a file and dstPath is valid)
*
* @param srcKey source key.
* @param contentLength data size of the object to copy.
* @param dstKey destination key.
* @return true if success, or false if upload is aborted.
*/
private boolean multipartCopy(String srcKey, long contentLength,
String dstKey) {
long realPartSize =
AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize);
int partNum = (int) (contentLength / realPartSize);
if (contentLength % realPartSize != 0) {
partNum++;
}
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, dstKey);
ObjectMetadata meta = new ObjectMetadata();
if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
initiateMultipartUploadRequest.setObjectMetadata(meta);
InitiateMultipartUploadResult initiateMultipartUploadResult =
ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
String uploadId = initiateMultipartUploadResult.getUploadId();
List<PartETag> partETags = new ArrayList<PartETag>();
try {
for (int i = 0; i < partNum; i++) {
long skipBytes = realPartSize * i;
long size = (realPartSize < contentLength - skipBytes) ?
realPartSize : contentLength - skipBytes;
UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest();
partCopyRequest.setSourceBucketName(bucketName);
partCopyRequest.setSourceKey(srcKey);
partCopyRequest.setBucketName(bucketName);
partCopyRequest.setKey(dstKey);
partCopyRequest.setUploadId(uploadId);
partCopyRequest.setPartSize(size);
partCopyRequest.setBeginIndex(skipBytes);
partCopyRequest.setPartNumber(i + 1);
UploadPartCopyResult partCopyResult =
ossClient.uploadPartCopy(partCopyRequest);
statistics.incrementWriteOps(1);
partETags.add(partCopyResult.getPartETag());
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, dstKey,
uploadId, partETags);
CompleteMultipartUploadResult completeMultipartUploadResult =
ossClient.completeMultipartUpload(completeMultipartUploadRequest);
LOG.debug(completeMultipartUploadResult.getETag());
return true;
} catch (OSSException | ClientException e) {
AbortMultipartUploadRequest abortMultipartUploadRequest =
new AbortMultipartUploadRequest(bucketName, dstKey, uploadId);
ossClient.abortMultipartUpload(abortMultipartUploadRequest);
return false;
}
}
/**
* Upload a file as an OSS object, using single upload.
*
* @param key object key.
* @param file local file to upload.
* @throws IOException if failed to upload object.
*/
public void uploadObject(String key, File file) throws IOException {
File object = file.getAbsoluteFile();
FileInputStream fis = new FileInputStream(object);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(object.length());
if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
try {
PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
LOG.debug(result.getETag());
statistics.incrementWriteOps(1);
} finally {
fis.close();
}
}
/**
* Upload a file as an OSS object, using multipart upload.
*
* @param key object key.
* @param file local file to upload.
* @throws IOException if failed to upload object.
*/
public void multipartUploadObject(String key, File file) throws IOException {
File object = file.getAbsoluteFile();
long dataLen = object.length();
long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
int partNum = (int) (dataLen / realPartSize);
if (dataLen % realPartSize != 0) {
partNum += 1;
}
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, key);
ObjectMetadata meta = new ObjectMetadata();
if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
initiateMultipartUploadRequest.setObjectMetadata(meta);
InitiateMultipartUploadResult initiateMultipartUploadResult =
ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
List<PartETag> partETags = new ArrayList<PartETag>();
String uploadId = initiateMultipartUploadResult.getUploadId();
try {
for (int i = 0; i < partNum; i++) {
// TODO: Optimize this, avoid opening the object multiple times
FileInputStream fis = new FileInputStream(object);
try {
long skipBytes = realPartSize * i;
AliyunOSSUtils.skipFully(fis, skipBytes);
long size = (realPartSize < dataLen - skipBytes) ?
realPartSize : dataLen - skipBytes;
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucketName);
uploadPartRequest.setKey(key);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(fis);
uploadPartRequest.setPartSize(size);
uploadPartRequest.setPartNumber(i + 1);
UploadPartResult uploadPartResult =
ossClient.uploadPart(uploadPartRequest);
statistics.incrementWriteOps(1);
partETags.add(uploadPartResult.getPartETag());
} finally {
fis.close();
}
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, key,
uploadId, partETags);
CompleteMultipartUploadResult completeMultipartUploadResult =
ossClient.completeMultipartUpload(completeMultipartUploadRequest);
LOG.debug(completeMultipartUploadResult.getETag());
} catch (OSSException | ClientException e) {
AbortMultipartUploadRequest abortMultipartUploadRequest =
new AbortMultipartUploadRequest(bucketName, key, uploadId);
ossClient.abortMultipartUpload(abortMultipartUploadRequest);
}
}
/**
* list objects.
*
* @param prefix prefix.
* @param maxListingLength max no. of entries
* @param marker last key in any previous search.
* @param recursive whether to list directory recursively.
* @return a list of matches.
*/
public ObjectListing listObjects(String prefix, int maxListingLength,
String marker, boolean recursive) {
String delimiter = recursive ? null : "/";
prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(prefix);
listRequest.setDelimiter(delimiter);
listRequest.setMaxKeys(maxListingLength);
listRequest.setMarker(marker);
ObjectListing listing = ossClient.listObjects(listRequest);
statistics.incrementReadOps(1);
return listing;
}
/**
* Retrieve a part of an object.
*
* @param key the object name that is being retrieved from the Aliyun OSS.
* @param byteStart start position.
* @param byteEnd end position.
* @return This method returns null if the key is not found.
*/
public InputStream retrieve(String key, long byteStart, long byteEnd) {
try {
GetObjectRequest request = new GetObjectRequest(bucketName, key);
request.setRange(byteStart, byteEnd);
return ossClient.getObject(request).getObjectContent();
} catch (OSSException | ClientException e) {
return null;
}
}
/**
* Close OSS client properly.
*/
public void close() {
if (ossClient != null) {
ossClient.shutdown();
ossClient = null;
}
}
/**
* Clean up all objects matching the prefix.
*
* @param prefix Aliyun OSS object prefix.
*/
public void purge(String prefix) {
String key;
try {
ObjectListing objects = listObjects(prefix, maxKeys, null, true);
for (OSSObjectSummary object : objects.getObjectSummaries()) {
key = object.getKey();
ossClient.deleteObject(bucketName, key);
}
for (String dir: objects.getCommonPrefixes()) {
deleteDirs(dir);
}
} catch (OSSException | ClientException e) {
LOG.error("Failed to purge " + prefix);
}
}
}

View File

@ -0,0 +1,260 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
* The input stream for OSS blob system.
* The class uses multi-part downloading to read data from the object content
* stream.
*/
public class AliyunOSSInputStream extends FSInputStream {
public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class);
private final long downloadPartSize;
private AliyunOSSFileSystemStore store;
private final String key;
private Statistics statistics;
private boolean closed;
private InputStream wrappedStream = null;
private long contentLength;
private long position;
private long partRemaining;
public AliyunOSSInputStream(Configuration conf,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
this.store = store;
this.key = key;
this.statistics = statistics;
this.contentLength = contentLength;
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
reopen(0);
closed = false;
}
/**
* Reopen the wrapped stream at give position, by seeking for
* data of a part length from object content stream.
*
* @param pos position from start of a file
* @throws IOException if failed to reopen
*/
private synchronized void reopen(long pos) throws IOException {
long partSize;
if (pos < 0) {
throw new EOFException("Cannot seek at negative position:" + pos);
} else if (pos > contentLength) {
throw new EOFException("Cannot seek after EOF, contentLength:" +
contentLength + " position:" + pos);
} else if (pos + downloadPartSize > contentLength) {
partSize = contentLength - pos;
} else {
partSize = downloadPartSize;
}
if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.close();
}
wrappedStream = store.retrieve(key, pos, pos + partSize -1);
if (wrappedStream == null) {
throw new IOException("Null IO stream");
}
position = pos;
partRemaining = partSize;
}
@Override
public synchronized int read() throws IOException {
checkNotClosed();
if (partRemaining <= 0 && position < contentLength) {
reopen(position);
}
int tries = MAX_RETRIES;
boolean retry;
int byteRead = -1;
do {
retry = false;
try {
byteRead = wrappedStream.read();
} catch (Exception e) {
handleReadException(e, --tries);
retry = true;
}
} while (retry);
if (byteRead >= 0) {
position++;
partRemaining--;
}
if (statistics != null && byteRead >= 0) {
statistics.incrementBytesRead(1);
}
return byteRead;
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
*
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
@Override
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
checkNotClosed();
if (buf == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > buf.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int bytesRead = 0;
// Not EOF, and read not done
while (position < contentLength && bytesRead < len) {
if (partRemaining == 0) {
reopen(position);
}
int tries = MAX_RETRIES;
boolean retry;
int bytes = -1;
do {
retry = false;
try {
bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
} catch (Exception e) {
handleReadException(e, --tries);
retry = true;
}
} while (retry);
if (bytes > 0) {
bytesRead += bytes;
position += bytes;
partRemaining -= bytes;
} else if (partRemaining != 0) {
throw new IOException("Failed to read from stream. Remaining:" +
partRemaining);
}
}
if (statistics != null && bytesRead > 0) {
statistics.incrementBytesRead(bytesRead);
}
// Read nothing, but attempt to read something
if (bytesRead == 0 && len > 0) {
return -1;
} else {
return bytesRead;
}
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
if (wrappedStream != null) {
wrappedStream.close();
}
}
@Override
public synchronized int available() throws IOException {
checkNotClosed();
long remaining = contentLength - position;
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int)remaining;
}
@Override
public synchronized void seek(long pos) throws IOException {
checkNotClosed();
if (position == pos) {
return;
} else if (pos > position && pos < position + partRemaining) {
AliyunOSSUtils.skipFully(wrappedStream, pos - position);
position = pos;
} else {
reopen(pos);
}
}
@Override
public synchronized long getPos() throws IOException {
checkNotClosed();
return position;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
checkNotClosed();
return false;
}
private void handleReadException(Exception e, int tries) throws IOException{
if (tries == 0) {
throw new IOException(e);
}
LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
" connection at position '" + position + "', " + e.getMessage());
try {
Thread.sleep(100);
} catch (InterruptedException e2) {
LOG.warn(e2.getMessage());
}
reopen(position);
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
* The output stream for OSS blob system.
* Data will be buffered on local disk, then uploaded to OSS in
* {@link #close()} method.
*/
public class AliyunOSSOutputStream extends OutputStream {
public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class);
private AliyunOSSFileSystemStore store;
private final String key;
private Statistics statistics;
private Progressable progress;
private long partSizeThreshold;
private LocalDirAllocator dirAlloc;
private boolean closed;
private File tmpFile;
private BufferedOutputStream backupStream;
public AliyunOSSOutputStream(Configuration conf,
AliyunOSSFileSystemStore store, String key, Progressable progress,
Statistics statistics) throws IOException {
this.store = store;
this.key = key;
// The caller cann't get any progress information
this.progress = progress;
this.statistics = statistics;
partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
if (conf.get(BUFFER_DIR_KEY) == null) {
conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
}
dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
tmpFile = dirAlloc.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
closed = false;
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
if (backupStream != null) {
backupStream.close();
}
long dataLen = tmpFile.length();
try {
if (dataLen <= partSizeThreshold) {
store.uploadObject(key, tmpFile);
} else {
store.multipartUploadObject(key, tmpFile);
}
} finally {
if (!tmpFile.delete()) {
LOG.warn("Can not delete file: " + tmpFile);
}
}
}
@Override
public synchronized void flush() throws IOException {
backupStream.flush();
}
@Override
public synchronized void write(int b) throws IOException {
backupStream.write(b);
statistics.incrementBytesWritten(1);
}
}

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import java.io.IOException;
import java.io.InputStream;
import com.aliyun.oss.common.auth.CredentialsProvider;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.ProviderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
* Utility methods for Aliyun OSS code.
*/
final public class AliyunOSSUtils {
private static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSUtils.class);
private AliyunOSSUtils() {
}
/**
* Used to get password from configuration.
*
* @param conf configuration that contains password information
* @param key the key of the password
* @return the value for the key
* @throws IOException if failed to get password from configuration
*/
public static String getValueWithKey(Configuration conf, String key)
throws IOException {
try {
final char[] pass = conf.getPassword(key);
if (pass != null) {
return (new String(pass)).trim();
} else {
return "";
}
} catch (IOException ioe) {
throw new IOException("Cannot find password option " + key, ioe);
}
}
/**
* Skip the requested number of bytes or fail if there are no enough bytes
* left. This allows for the possibility that {@link InputStream#skip(long)}
* may not skip as many bytes as requested (most likely because of reaching
* EOF).
*
* @param is the input stream to skip.
* @param n the number of bytes to skip.
* @throws IOException thrown when skipped less number of bytes.
*/
public static void skipFully(InputStream is, long n) throws IOException {
long total = 0;
long cur = 0;
do {
cur = is.skip(n - total);
total += cur;
} while((total < n) && (cur > 0));
if (total < n) {
throw new IOException("Failed to skip " + n + " bytes, possibly due " +
"to EOF.");
}
}
/**
* Calculate a proper size of multipart piece. If <code>minPartSize</code>
* is too small, the number of multipart pieces may exceed the limit of
* {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}.
*
* @param contentLength the size of file.
* @param minPartSize the minimum size of multipart piece.
* @return a revisional size of multipart piece.
*/
public static long calculatePartSize(long contentLength, long minPartSize) {
long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1;
return Math.max(minPartSize, tmpPartSize);
}
/**
* Create credential provider specified by configuration, or create default
* credential provider if not specified.
*
* @param conf configuration
* @return a credential provider
* @throws IOException on any problem. Class construction issues may be
* nested inside the IOE.
*/
public static CredentialsProvider getCredentialsProvider(Configuration conf)
throws IOException {
CredentialsProvider credentials;
String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY);
if (StringUtils.isEmpty(className)) {
Configuration newConf =
ProviderUtils.excludeIncompatibleCredentialProviders(conf,
AliyunOSSFileSystem.class);
credentials = new AliyunCredentialsProvider(newConf);
} else {
try {
LOG.debug("Credential provider class is:" + className);
Class<?> credClass = Class.forName(className);
try {
credentials =
(CredentialsProvider)credClass.getDeclaredConstructor(
Configuration.class).newInstance(conf);
} catch (NoSuchMethodException | SecurityException e) {
credentials =
(CredentialsProvider)credClass.getDeclaredConstructor()
.newInstance();
}
} catch (ClassNotFoundException e) {
throw new IOException(className + " not found.", e);
} catch (NoSuchMethodException | SecurityException e) {
throw new IOException(String.format("%s constructor exception. A " +
"class specified in %s must provide an accessible constructor " +
"accepting URI and Configuration, or an accessible default " +
"constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY),
e);
} catch (ReflectiveOperationException | IllegalArgumentException e) {
throw new IOException(className + " instantiation exception.", e);
}
}
return credentials;
}
/**
* Turns a path (relative or otherwise) into an OSS key, adding a trailing
* "/" if the path is not the root <i>and</i> does not already have a "/"
* at the end.
*
* @param key OSS key or ""
* @return the with a trailing "/", or, if it is the root key, "".
*/
public static String maybeAddTrailingSlash(String key) {
if (StringUtils.isNotEmpty(key) && !key.endsWith("/")) {
return key + '/';
} else {
return key;
}
}
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
/**
* ALL configuration constants for OSS filesystem.
*/
public final class Constants {
private Constants() {
}
// Class of credential provider
public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY =
"fs.oss.credentials.provider";
// OSS access verification
public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
public static final String SECURITY_TOKEN = "fs.oss.securityToken";
// Number of simultaneous connections to oss
public static final String MAXIMUM_CONNECTIONS_KEY =
"fs.oss.connection.maximum";
public static final int MAXIMUM_CONNECTIONS_DEFAULT = 32;
// Connect to oss over ssl
public static final String SECURE_CONNECTIONS_KEY =
"fs.oss.connection.secure.enabled";
public static final boolean SECURE_CONNECTIONS_DEFAULT = true;
// Use a custom endpoint
public static final String ENDPOINT_KEY = "fs.oss.endpoint";
// Connect to oss through a proxy server
public static final String PROXY_HOST_KEY = "fs.oss.proxy.host";
public static final String PROXY_PORT_KEY = "fs.oss.proxy.port";
public static final String PROXY_USERNAME_KEY = "fs.oss.proxy.username";
public static final String PROXY_PASSWORD_KEY = "fs.oss.proxy.password";
public static final String PROXY_DOMAIN_KEY = "fs.oss.proxy.domain";
public static final String PROXY_WORKSTATION_KEY =
"fs.oss.proxy.workstation";
// Number of times we should retry errors
public static final String MAX_ERROR_RETRIES_KEY = "fs.oss.attempts.maximum";
public static final int MAX_ERROR_RETRIES_DEFAULT = 20;
// Time until we give up trying to establish a connection to oss
public static final String ESTABLISH_TIMEOUT_KEY =
"fs.oss.connection.establish.timeout";
public static final int ESTABLISH_TIMEOUT_DEFAULT = 50000;
// Time until we give up on a connection to oss
public static final String SOCKET_TIMEOUT_KEY = "fs.oss.connection.timeout";
public static final int SOCKET_TIMEOUT_DEFAULT = 200000;
// Number of records to get while paging through a directory listing
public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum";
public static final int MAX_PAGING_KEYS_DEFAULT = 1000;
// Size of each of or multipart pieces in bytes
public static final String MULTIPART_UPLOAD_SIZE_KEY =
"fs.oss.multipart.upload.size";
public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024;
public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000;
// Minimum size in bytes before we start a multipart uploads or copy
public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY =
"fs.oss.multipart.upload.threshold";
public static final long MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT =
20 * 1024 * 1024;
public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
"fs.oss.multipart.download.size";
public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
// Comma separated list of directories
public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
// private | public-read | public-read-write
public static final String CANNED_ACL_KEY = "fs.oss.acl.default";
public static final String CANNED_ACL_DEFAULT = "";
// OSS server-side encryption
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY =
"fs.oss.server-side-encryption-algorithm";
public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size";
public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024;
public static final String FS_OSS = "oss";
public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L;
public static final int MAX_RETRIES = 10;
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Aliyun OSS Filesystem.
*/
package org.apache.hadoop.fs.aliyun.oss;

View File

@ -0,0 +1,294 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
# Hadoop-Aliyun module: Integration with Aliyun Web Services
<!-- MACRO{toc|fromDepth=0|toDepth=5} -->
## Overview
The `hadoop-aliyun` module provides support for Aliyun integration with
[Aliyun Object Storage Service (Aliyun OSS)](https://www.aliyun.com/product/oss).
The generated JAR file, `hadoop-aliyun.jar` also declares a transitive
dependency on all external artifacts which are needed for this support — enabling
downstream applications to easily use this support.
To make it part of Apache Hadoop's default classpath, simply make sure
that HADOOP_OPTIONAL_TOOLS in hadoop-env.sh has 'hadoop-aliyun' in the list.
### Features
* Read and write data stored in Aliyun OSS.
* Present a hierarchical file system view by implementing the standard Hadoop
[`FileSystem`](../api/org/apache/hadoop/fs/FileSystem.html) interface.
* Can act as a source of data in a MapReduce job, or a sink.
### Warning #1: Object Stores are not filesystems.
Aliyun OSS is an example of "an object store". In order to achieve scalability
and especially high availability, Aliyun OSS has relaxed some of the constraints
which classic "POSIX" filesystems promise.
Specifically
1. Atomic operations: `delete()` and `rename()` are implemented by recursive
file-by-file operations. They take time at least proportional to the number of files,
during which time partial updates may be visible. `delete()` and `rename()`
can not guarantee atomicity. If the operations are interrupted, the filesystem
is left in an intermediate state.
2. File owner and group are persisted, but the permissions model is not enforced.
Authorization occurs at the level of the entire Aliyun account via
[Aliyun Resource Access Management (Aliyun RAM)](https://www.aliyun.com/product/ram).
3. Directory last access time is not tracked.
4. The append operation is not supported.
### Warning #2: Directory last access time is not tracked,
features of Hadoop relying on this can have unexpected behaviour. E.g. the
AggregatedLogDeletionService of YARN will not remove the appropriate logfiles.
### Warning #3: Your Aliyun credentials are valuable
Your Aliyun credentials not only pay for services, they offer read and write
access to the data. Anyone with the account can not only read your datasets
—they can delete them.
Do not inadvertently share these credentials through means such as
1. Checking in to SCM any configuration files containing the secrets.
2. Logging them to a console, as they invariably end up being seen.
3. Defining filesystem URIs with the credentials in the URL, such as
`oss://accessKeyId:accessKeySecret@directory/file`. They will end up in
logs and error messages.
4. Including the secrets in bug reports.
If you do any of these: change your credentials immediately!
### Warning #4: The Aliyun OSS client provided by Aliyun E-MapReduce are different from this implementation
Specifically: on Aliyun E-MapReduce, `oss://` is also supported but with
a different implementation. If you are using Aliyun E-MapReduce,
follow these instructions —and be aware that all issues related to Aliyun
OSS integration in E-MapReduce can only be addressed by Aliyun themselves:
please raise your issues with them.
## OSS
### Authentication properties
<property>
<name>fs.oss.accessKeyId</name>
<description>Aliyun access key ID</description>
</property>
<property>
<name>fs.oss.accessKeySecret</name>
<description>Aliyun access key secret</description>
</property>
<property>
<name>fs.oss.credentials.provider</name>
<description>
Class name of a credentials provider that implements
com.aliyun.oss.common.auth.CredentialsProvider. Omit if using access/secret keys
or another authentication mechanism. The specified class must provide an
accessible constructor accepting java.net.URI and
org.apache.hadoop.conf.Configuration, or an accessible default constructor.
</description>
</property>
### Other properties
<property>
<name>fs.oss.endpoint</name>
<description>Aliyun OSS endpoint to connect to. An up-to-date list is
provided in the Aliyun OSS Documentation.
</description>
</property>
<property>
<name>fs.oss.proxy.host</name>
<description>Hostname of the (optinal) proxy server for Aliyun OSS connection</description>
</property>
<property>
<name>fs.oss.proxy.port</name>
<description>Proxy server port</description>
</property>
<property>
<name>fs.oss.proxy.username</name>
<description>Username for authenticating with proxy server</description>
</property>
<property>
<name>fs.oss.proxy.password</name>
<description>Password for authenticating with proxy server.</description>
</property>
<property>
<name>fs.oss.proxy.domain</name>
<description>Domain for authenticating with proxy server.</description>
</property>
<property>
<name>fs.oss.proxy.workstation</name>
<description>Workstation for authenticating with proxy server.</description>
</property>
<property>
<name>fs.oss.attempts.maximum</name>
<value>20</value>
<description>How many times we should retry commands on transient errors.</description>
</property>
<property>
<name>fs.oss.connection.establish.timeout</name>
<value>50000</value>
<description>Connection setup timeout in milliseconds.</description>
</property>
<property>
<name>fs.oss.connection.timeout</name>
<value>200000</value>
<description>Socket connection timeout in milliseconds.</description>
</property>
<property>
<name>fs.oss.paging.maximum</name>
<value>1000</value>
<description>How many keys to request from Aliyun OSS when doing directory listings at a time.
</description>
</property>
<property>
<name>fs.oss.multipart.upload.size</name>
<value>10485760</value>
<description>Size of each of multipart pieces in bytes.</description>
</property>
<property>
<name>fs.oss.multipart.upload.threshold</name>
<value>20971520</value>
<description>Minimum size in bytes before we start a multipart uploads or copy.</description>
</property>
<property>
<name>fs.oss.multipart.download.size</name>
<value>102400/value>
<description>Size in bytes in each request from ALiyun OSS.</description>
</property>
<property>
<name>fs.oss.buffer.dir</name>
<description>Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS</description>
</property>
<property>
<name>fs.oss.acl.default</name>
<value></vaule>
<description>Set a canned ACL for bucket. Value may be private, public-read, public-read-write.
</description>
</property>
<property>
<name>fs.oss.server-side-encryption-algorithm</name>
<value></vaule>
<description>Specify a server-side encryption algorithm for oss: file system.
Unset by default, and the only other currently allowable value is AES256.
</description>
</property>
<property>
<name>fs.oss.connection.maximum</name>
<value>32</value>
<description>Number of simultaneous connections to oss.</description>
</property>
<property>
<name>fs.oss.connection.secure.enabled</name>
<value>true</value>
<description>Connect to oss over ssl or not, true by default.</description>
</property>
## Testing the hadoop-aliyun Module
To test `oss://` filesystem client, two files which pass in authentication
details to the test runner are needed.
1. `auth-keys.xml`
2. `core-site.xml`
Those two configuration files must be put into
`hadoop-tools/hadoop-aliyun/src/test/resources`.
### `core-site.xml`
This file pre-exists and sources the configurations created in `auth-keys.xml`.
For most cases, no modification is needed, unless a specific, non-default property
needs to be set during the testing.
### `auth-keys.xml`
This file triggers the testing of Aliyun OSS module. Without this file,
*none of the tests in this module will be executed*
It contains the access key Id/secret and proxy information that are needed to
connect to Aliyun OSS, and an OSS bucket URL should be also provided.
1. `test.fs.oss.name` : the URL of the bucket for Aliyun OSS tests
The contents of the bucket will be cleaned during the testing process, so
do not use the bucket for any purpose other than testing.
### Run Hadoop contract tests
Create file `contract-test-options.xml` under `/test/resources`. If a
specific file `fs.contract.test.fs.oss` test path is not defined, those
tests will be skipped. Credentials are also needed to run any of those
tests, they can be copied from `auth-keys.xml` or through direct
XInclude inclusion. Here is an example of `contract-test-options.xml`:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<include xmlns="http://www.w3.org/2001/XInclude"
href="auth-keys.xml"/>
<property>
<name>fs.contract.test.fs.oss</name>
<value>oss://spark-tests</value>
</property>
<property>
<name>fs.oss.impl</name>
<value>org.apache.hadoop.fs.aliyun.AliyunOSSFileSystem</value>
</property>
<property>
<name>fs.oss.endpoint</name>
<value>oss-cn-hangzhou.aliyuncs.com</value>
</property>
<property>
<name>fs.oss.buffer.dir</name>
<value>/tmp/oss</value>
</property>
<property>
<name>fs.oss.multipart.download.size</name>
<value>102400</value>
</property>
</configuration>

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.internal.AssumptionViolatedException;
import java.io.IOException;
import java.net.URI;
/**
* Utility class for Aliyun OSS Tests.
*/
public final class AliyunOSSTestUtils {
private AliyunOSSTestUtils() {
}
/**
* Create the test filesystem.
*
* If the test.fs.oss.name property is not set,
* tests will fail.
*
* @param conf configuration
* @return the FS
* @throws IOException
*/
public static AliyunOSSFileSystem createTestFileSystem(Configuration conf)
throws IOException {
String fsname = conf.getTrimmed(
TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME, "");
boolean liveTest = StringUtils.isNotEmpty(fsname);
URI testURI = null;
if (liveTest) {
testURI = URI.create(fsname);
liveTest = testURI.getScheme().equals(Constants.FS_OSS);
}
if (!liveTest) {
throw new AssumptionViolatedException("No test filesystem in "
+ TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME);
}
AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem();
ossfs.initialize(testURI, conf);
return ossfs;
}
/**
* Generate unique test path for multiple user tests.
*
* @return root test path
*/
public static String generateUniqueTestPath() {
String testUniqueForkId = System.getProperty("test.unique.fork.id");
return testUniqueForkId == null ? "/test" :
"/" + testUniqueForkId + "/test";
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.common.auth.Credentials;
import com.aliyun.oss.common.auth.InvalidCredentialsException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
import static org.apache.hadoop.fs.aliyun.oss.Constants.SECURITY_TOKEN;
/**
* Tests use of temporary credentials (for example, Aliyun STS & Aliyun OSS).
* This test extends a class that "does things to the root directory", and
* should only be used against transient filesystems where you don't care about
* the data.
*/
public class TestAliyunCredentials extends AbstractFSContractTestBase {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
@Test
public void testCredentialMissingAccessKeyId() throws Throwable {
Configuration conf = new Configuration();
conf.set(ACCESS_KEY_ID, "");
conf.set(ACCESS_KEY_SECRET, "accessKeySecret");
conf.set(SECURITY_TOKEN, "token");
validateCredential(conf);
}
@Test
public void testCredentialMissingAccessKeySecret() throws Throwable {
Configuration conf = new Configuration();
conf.set(ACCESS_KEY_ID, "accessKeyId");
conf.set(ACCESS_KEY_SECRET, "");
conf.set(SECURITY_TOKEN, "token");
validateCredential(conf);
}
private void validateCredential(Configuration conf) {
try {
AliyunCredentialsProvider provider
= new AliyunCredentialsProvider(conf);
Credentials credentials = provider.getCredentials();
fail("Expected a CredentialInitializationException, got " + credentials);
} catch (InvalidCredentialsException expected) {
// expected
} catch (IOException e) {
fail("Unexpected exception.");
}
}
}

View File

@ -0,0 +1,239 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
* Tests a live Aliyun OSS system.
*
* This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from
* TestCase which uses the old Junit3 runner that doesn't ignore assumptions
* properly making it impossible to skip the tests if we don't have a valid
* bucket.
*/
public class TestAliyunOSSFileSystemContract
extends FileSystemContractBaseTest {
public static final String TEST_FS_OSS_NAME = "test.fs.oss.name";
private static String testRootPath =
AliyunOSSTestUtils.generateUniqueTestPath();
@Override
public void setUp() throws Exception {
Configuration conf = new Configuration();
fs = AliyunOSSTestUtils.createTestFileSystem(conf);
super.setUp();
}
@Override
public void tearDown() throws Exception {
if (fs != null) {
fs.delete(super.path(testRootPath), true);
}
super.tearDown();
}
@Override
protected Path path(String path) {
if (path.startsWith("/")) {
return super.path(testRootPath + path);
} else {
return super.path(testRootPath + "/" + path);
}
}
@Override
public void testMkdirsWithUmask() throws Exception {
// not supported
}
@Override
public void testRootDirAlwaysExists() throws Exception {
//this will throw an exception if the path is not found
fs.getFileStatus(super.path("/"));
//this catches overrides of the base exists() method that don't
//use getFileStatus() as an existence probe
assertTrue("FileSystem.exists() fails for root",
fs.exists(super.path("/")));
}
@Override
public void testRenameRootDirForbidden() throws Exception {
if (!renameSupported()) {
return;
}
rename(super.path("/"),
super.path("/test/newRootDir"),
false, true, false);
}
public void testDeleteSubdir() throws IOException {
Path parentDir = this.path("/test/hadoop");
Path file = this.path("/test/hadoop/file");
Path subdir = this.path("/test/hadoop/subdir");
this.createFile(file);
assertTrue("Created subdir", this.fs.mkdirs(subdir));
assertTrue("File exists", this.fs.exists(file));
assertTrue("Parent dir exists", this.fs.exists(parentDir));
assertTrue("Subdir exists", this.fs.exists(subdir));
assertTrue("Deleted subdir", this.fs.delete(subdir, true));
assertTrue("Parent should exist", this.fs.exists(parentDir));
assertTrue("Deleted file", this.fs.delete(file, false));
assertTrue("Parent should exist", this.fs.exists(parentDir));
}
@Override
protected boolean renameSupported() {
return true;
}
@Override
public void testRenameNonExistentPath() throws Exception {
if (this.renameSupported()) {
Path src = this.path("/test/hadoop/path");
Path dst = this.path("/test/new/newpath");
try {
super.rename(src, dst, false, false, false);
fail("Should throw FileNotFoundException!");
} catch (FileNotFoundException e) {
// expected
}
}
}
@Override
public void testRenameFileMoveToNonExistentDirectory() throws Exception {
if (this.renameSupported()) {
Path src = this.path("/test/hadoop/file");
this.createFile(src);
Path dst = this.path("/test/new/newfile");
try {
super.rename(src, dst, false, true, false);
fail("Should throw FileNotFoundException!");
} catch (FileNotFoundException e) {
// expected
}
}
}
@Override
public void testRenameDirectoryMoveToNonExistentDirectory() throws Exception {
if (this.renameSupported()) {
Path src = this.path("/test/hadoop/dir");
this.fs.mkdirs(src);
Path dst = this.path("/test/new/newdir");
try {
super.rename(src, dst, false, true, false);
fail("Should throw FileNotFoundException!");
} catch (FileNotFoundException e) {
// expected
}
}
}
@Override
public void testRenameFileMoveToExistingDirectory() throws Exception {
super.testRenameFileMoveToExistingDirectory();
}
@Override
public void testRenameFileAsExistingFile() throws Exception {
if (this.renameSupported()) {
Path src = this.path("/test/hadoop/file");
this.createFile(src);
Path dst = this.path("/test/new/newfile");
this.createFile(dst);
try {
super.rename(src, dst, false, true, true);
fail("Should throw FileAlreadyExistsException");
} catch (FileAlreadyExistsException e) {
// expected
}
}
}
@Override
public void testRenameDirectoryAsExistingFile() throws Exception {
if (this.renameSupported()) {
Path src = this.path("/test/hadoop/dir");
this.fs.mkdirs(src);
Path dst = this.path("/test/new/newfile");
this.createFile(dst);
try {
super.rename(src, dst, false, true, true);
fail("Should throw FileAlreadyExistsException");
} catch (FileAlreadyExistsException e) {
// expected
}
}
}
public void testGetFileStatusFileAndDirectory() throws Exception {
Path filePath = this.path("/test/oss/file1");
this.createFile(filePath);
assertTrue("Should be file", this.fs.getFileStatus(filePath).isFile());
assertFalse("Should not be directory",
this.fs.getFileStatus(filePath).isDirectory());
Path dirPath = this.path("/test/oss/dir");
this.fs.mkdirs(dirPath);
assertTrue("Should be directory",
this.fs.getFileStatus(dirPath).isDirectory());
assertFalse("Should not be file", this.fs.getFileStatus(dirPath).isFile());
}
public void testMkdirsForExistingFile() throws Exception {
Path testFile = this.path("/test/hadoop/file");
assertFalse(this.fs.exists(testFile));
this.createFile(testFile);
assertTrue(this.fs.exists(testFile));
try {
this.fs.mkdirs(testFile);
fail("Should throw FileAlreadyExistsException!");
} catch (FileAlreadyExistsException e) {
// expected
}
}
public void testWorkingDirectory() throws Exception {
Path workDir = super.path(this.getDefaultWorkingDirectory());
assertEquals(workDir, this.fs.getWorkingDirectory());
this.fs.setWorkingDirectory(super.path("."));
assertEquals(workDir, this.fs.getWorkingDirectory());
this.fs.setWorkingDirectory(super.path(".."));
assertEquals(workDir.getParent(), this.fs.getWorkingDirectory());
Path relativeDir = super.path("hadoop");
this.fs.setWorkingDirectory(relativeDir);
assertEquals(relativeDir, this.fs.getWorkingDirectory());
Path absoluteDir = super.path("/test/hadoop");
this.fs.setWorkingDirectory(absoluteDir);
assertEquals(absoluteDir, this.fs.getWorkingDirectory());
}
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.security.DigestInputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeNotNull;
/**
* Test the bridging logic between Hadoop's abstract filesystem and
* Aliyun OSS.
*/
public class TestAliyunOSSFileSystemStore {
private Configuration conf;
private AliyunOSSFileSystemStore store;
private AliyunOSSFileSystem fs;
@Before
public void setUp() throws Exception {
conf = new Configuration();
fs = new AliyunOSSFileSystem();
fs.initialize(URI.create(conf.get("test.fs.oss.name")), conf);
store = fs.getStore();
}
@After
public void tearDown() throws Exception {
try {
store.purge("test");
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
@BeforeClass
public static void checkSettings() throws Exception {
Configuration conf = new Configuration();
assumeNotNull(conf.get(Constants.ACCESS_KEY_ID));
assumeNotNull(conf.get(Constants.ACCESS_KEY_SECRET));
assumeNotNull(conf.get("test.fs.oss.name"));
}
protected void writeRenameReadCompare(Path path, long len)
throws IOException, NoSuchAlgorithmException {
// If len > fs.oss.multipart.upload.threshold,
// we'll use a multipart upload copy
MessageDigest digest = MessageDigest.getInstance("MD5");
OutputStream out = new BufferedOutputStream(
new DigestOutputStream(fs.create(path, false), digest));
for (long i = 0; i < len; i++) {
out.write('Q');
}
out.flush();
out.close();
assertTrue("Exists", fs.exists(path));
Path copyPath = path.suffix(".copy");
fs.rename(path, copyPath);
assertTrue("Copy exists", fs.exists(copyPath));
// Download file from Aliyun OSS and compare the digest against the original
MessageDigest digest2 = MessageDigest.getInstance("MD5");
InputStream in = new BufferedInputStream(
new DigestInputStream(fs.open(copyPath), digest2));
long copyLen = 0;
while (in.read() != -1) {
copyLen++;
}
in.close();
assertEquals("Copy length matches original", len, copyLen);
assertArrayEquals("Digests match", digest.digest(), digest2.digest());
}
@Test
public void testSmallUpload() throws IOException, NoSuchAlgorithmException {
// Regular upload, regular copy
writeRenameReadCompare(new Path("/test/small"), 16384);
}
@Test
public void testLargeUpload()
throws IOException, NoSuchAlgorithmException {
// Multipart upload, multipart copy
writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte
}
}

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import static org.junit.Assert.assertTrue;
/**
* Tests basic functionality for AliyunOSSInputStream, including seeking and
* reading files.
*/
public class TestAliyunOSSInputStream {
private FileSystem fs;
private static final Logger LOG =
LoggerFactory.getLogger(TestAliyunOSSInputStream.class);
private static String testRootPath =
AliyunOSSTestUtils.generateUniqueTestPath();
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
fs = AliyunOSSTestUtils.createTestFileSystem(conf);
}
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.delete(new Path(testRootPath), true);
}
}
private Path setPath(String path) {
if (path.startsWith("/")) {
return new Path(testRootPath + path);
} else {
return new Path(testRootPath + "/" + path);
}
}
@Test
public void testSeekFile() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFile.txt");
long size = 5 * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
LOG.info("5MB file created: smallSeekFile.txt");
FSDataInputStream instream = this.fs.open(smallSeekFile);
int seekTimes = 5;
LOG.info("multiple fold position seeking test...:");
for (int i = 0; i < seekTimes; i++) {
long pos = size / (seekTimes - i) - 1;
LOG.info("begin seeking for pos: " + pos);
instream.seek(pos);
assertTrue("expected position at:" + pos + ", but got:"
+ instream.getPos(), instream.getPos() == pos);
LOG.info("completed seeking at pos: " + instream.getPos());
}
LOG.info("random position seeking test...:");
Random rand = new Random();
for (int i = 0; i < seekTimes; i++) {
long pos = Math.abs(rand.nextLong()) % size;
LOG.info("begin seeking for pos: " + pos);
instream.seek(pos);
assertTrue("expected position at:" + pos + ", but got:"
+ instream.getPos(), instream.getPos() == pos);
LOG.info("completed seeking at pos: " + instream.getPos());
}
IOUtils.closeStream(instream);
}
@Test
public void testReadFile() throws Exception {
final int bufLen = 256;
final int sizeFlag = 5;
String filename = "readTestFile_" + sizeFlag + ".txt";
Path readTestFile = setPath("/test/" + filename);
long size = sizeFlag * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, readTestFile, size, 256, 255);
LOG.info(sizeFlag + "MB file created: /test/" + filename);
FSDataInputStream instream = this.fs.open(readTestFile);
byte[] buf = new byte[bufLen];
long bytesRead = 0;
while (bytesRead < size) {
int bytes;
if (size - bytesRead < bufLen) {
int remaining = (int)(size - bytesRead);
bytes = instream.read(buf, 0, remaining);
} else {
bytes = instream.read(buf, 0, bufLen);
}
bytesRead += bytes;
if (bytesRead % (1024 * 1024) == 0) {
int available = instream.available();
int remaining = (int)(size - bytesRead);
assertTrue("expected remaining:" + remaining + ", but got:" + available,
remaining == available);
LOG.info("Bytes read: " + Math.round((double)bytesRead / (1024 * 1024))
+ " MB");
}
}
assertTrue(instream.available() == 0);
IOUtils.closeStream(instream);
}
}

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.IOException;
/**
* Tests regular and multi-part upload functionality for AliyunOSSOutputStream.
*/
public class TestAliyunOSSOutputStream {
private FileSystem fs;
private static String testRootPath =
AliyunOSSTestUtils.generateUniqueTestPath();
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024);
fs = AliyunOSSTestUtils.createTestFileSystem(conf);
}
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.delete(new Path(testRootPath), true);
}
}
protected Path getTestPath() {
return new Path(testRootPath + "/test-aliyun-oss");
}
@Test
public void testRegularUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
}
@Test
public void testMultiPartUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
}
@Test
public void testMultiPartUploadLimit() throws IOException {
long partSize1 = AliyunOSSUtils.calculatePartSize(10 * 1024, 100 * 1024);
assert(10 * 1024 / partSize1 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
long partSize2 = AliyunOSSUtils.calculatePartSize(200 * 1024, 100 * 1024);
assert(200 * 1024 / partSize2 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
long partSize3 = AliyunOSSUtils.calculatePartSize(10000 * 100 * 1024,
100 * 1024);
assert(10000 * 100 * 1024 / partSize3
< Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
long partSize4 = AliyunOSSUtils.calculatePartSize(10001 * 100 * 1024,
100 * 1024);
assert(10001 * 100 * 1024 / partSize4
< Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
/**
* The contract of Aliyun OSS: only enabled if the test bucket is provided.
*/
public class AliyunOSSContract extends AbstractBondedFSContract {
public static final String CONTRACT_XML = "contract/aliyun-oss.xml";
public AliyunOSSContract(Configuration conf) {
super(conf);
//insert the base features
addConfResource(CONTRACT_XML);
}
@Override
public String getScheme() {
return "oss";
}
@Override
public Path getTestPath() {
String testUniqueForkId = System.getProperty("test.unique.fork.id");
return testUniqueForkId == null ? super.getTestPath() :
new Path("/" + testUniqueForkId, "test");
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Aliyun OSS contract creating tests.
*/
public class TestAliyunOSSContractCreate extends AbstractContractCreateTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Aliyun OSS contract deleting tests.
*/
public class TestAliyunOSSContractDelete extends AbstractContractDeleteTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
* Contract test suite covering Aliyun OSS integration with DistCp.
*/
public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest {
private static final long MULTIPART_SETTING = 8 * 1024 * 1024; // 8 MB
@Override
protected Configuration createConfiguration() {
Configuration newConf = super.createConfiguration();
newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING);
return newConf;
}
@Override
protected AliyunOSSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Test getFileStatus and related listing operations.
*/
public class TestAliyunOSSContractGetFileStatus
extends AbstractContractGetFileStatusTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Aliyun OSS contract directory tests.
*/
public class TestAliyunOSSContractMkdir extends AbstractContractMkdirTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Aliyun OSS contract opening file tests.
*/
public class TestAliyunOSSContractOpen extends AbstractContractOpenTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Aliyun OSS contract renaming tests.
*/
public class TestAliyunOSSContractRename extends AbstractContractRenameTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
* Root dir operations against an Aliyun OSS bucket.
*/
public class TestAliyunOSSContractRootDir extends
AbstractContractRootDirectoryTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestAliyunOSSContractRootDir.class);
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
@Override
public void testListEmptyRootDirectory() throws IOException {
for (int attempt = 1, maxAttempts = 10; attempt <= maxAttempts; ++attempt) {
try {
super.testListEmptyRootDirectory();
break;
} catch (AssertionError | FileNotFoundException e) {
if (attempt < maxAttempts) {
LOG.info("Attempt {} of {} for empty root directory test failed. "
+ "Attempting retry.", attempt, maxAttempts);
try {
Thread.sleep(1000);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
fail("Test interrupted.");
break;
}
} else {
LOG.error(
"Empty root directory test failed {} attempts. Failing test.",
maxAttempts);
throw e;
}
}
}
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Aliyun OSS contract seeking tests.
*/
public class TestAliyunOSSContractSeek extends AbstractContractSeekTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AliyunOSSContract(conf);
}
}

View File

@ -0,0 +1,115 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<property>
<name>fs.contract.test.random-seek-count</name>
<value>10</value>
</property>
<property>
<name>fs.contract.is-blobstore</name>
<value>true</value>
</property>
<property>
<name>fs.contract.is-case-sensitive</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-returns-false-if-source-missing</name>
<value>false</value>
</property>
<property>
<name>fs.contract.rename-remove-dest-if-empty-dir</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-append</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-directory-delete</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-rename</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-block-locality</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-concat</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-seek</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-seek-on-closed-file</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-strict-exceptions</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-unix-permissions</name>
<value>false</value>
</property>
<property>
<name>fs.contract.rename-overwrites-dest</name>
<value>true</value>
</property>
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-getfilestatus</name>
<value>true</value>
</property>
<property>
<name>fs.oss.multipart.download.size</name>
<value>102400</value>
</property>
</configuration>

View File

@ -0,0 +1,46 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>target/build/test</value>
<description>A base for other temporary directories.</description>
<final>true</final>
</property>
<!-- Turn security off for tests by default -->
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<!--
To run these tests.
# Create a file auth-keys.xml - DO NOT ADD TO REVISION CONTROL
# add the property test.fs.oss.name to point to an OSS filesystem URL
# Add the credentials for the service you are testing against
-->
<include xmlns="http://www.w3.org/2001/XInclude" href="auth-keys.xml">
<fallback/>
</include>
</configuration>

View File

@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# log4j configuration used during build and unit tests
log4j.rootLogger=INFO,stdout
log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n

View File

@ -100,6 +100,12 @@
<scope>compile</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<scope>compile</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-sls</artifactId>

View File

@ -47,6 +47,7 @@
<module>hadoop-aws</module>
<module>hadoop-kafka</module>
<module>hadoop-azure-datalake</module>
<module>hadoop-aliyun</module>
</modules>
<build>