HADOOP-12756. Incorporate Aliyun OSS file system implementation. Contributed by Mingfei Shi and Lin Zhou

This commit is contained in:
Kai Zheng 2016-08-04 21:21:10 +08:00 committed by Mingfei
parent 5f23abfa30
commit a5d5342228
26 changed files with 2784 additions and 0 deletions

2
.gitignore vendored
View File

@ -31,3 +31,5 @@ hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml
hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml
hadoop-tools/hadoop-azure/src/test/resources/azure-auth-keys.xml
patchprocess/
hadoop-tools/hadoop-aliyun/src/test/resources/auth-keys.xml
hadoop-tools/hadoop-aliyun/src/test/resources/contract-test-options.xml

View File

@ -436,6 +436,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-kms</artifactId>
@ -1004,6 +1010,22 @@
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>2.2.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>

View File

@ -0,0 +1,18 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<FindBugsFilter>
</FindBugsFilter>

View File

@ -0,0 +1,133 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>3.0.0-alpha2-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<artifactId>hadoop-aliyun</artifactId>
<name>Apache Hadoop Aliyun OSS support</name>
<packaging>jar</packaging>
<properties>
<file.encoding>UTF-8</file.encoding>
<downloadSources>true</downloadSources>
</properties>
<profiles>
<profile>
<id>tests-off</id>
<activation>
<file>
<missing>src/test/resources/auth-keys.xml</missing>
</file>
</activation>
<properties>
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
<profile>
<id>tests-on</id>
<activation>
<file>
<exists>src/test/resources/auth-keys.xml</exists>
</file>
</activation>
<properties>
<maven.test.skip>false</maven.test.skip>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<findbugsXmlOutput>true</findbugsXmlOutput>
<xmlOutput>true</xmlOutput>
<excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
</excludeFilterFile>
<effort>Max</effort>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<configuration>
<dependencyDetailsEnabled>false</dependencyDetailsEnabled>
<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>deplist</id>
<phase>compile</phase>
<goals>
<goal>list</goal>
</goals>
<configuration>
<!-- build a shellprofile -->
<outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,847 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
import com.aliyun.oss.common.auth.CredentialsProvider;
import com.aliyun.oss.common.auth.DefaultCredentialProvider;
import com.aliyun.oss.common.auth.DefaultCredentials;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.UserInfo;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Progressable;
import com.aliyun.oss.ClientConfiguration;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.common.comm.Protocol;
import com.aliyun.oss.model.AbortMultipartUploadRequest;
import com.aliyun.oss.model.CannedAccessControlList;
import com.aliyun.oss.model.CompleteMultipartUploadRequest;
import com.aliyun.oss.model.CompleteMultipartUploadResult;
import com.aliyun.oss.model.CopyObjectResult;
import com.aliyun.oss.model.DeleteObjectsRequest;
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadResult;
import com.aliyun.oss.model.ListObjectsRequest;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.UploadPartCopyRequest;
import com.aliyun.oss.model.UploadPartCopyResult;
/**
* Implementation of {@link FileSystem} for <a href="https://oss.aliyun.com">
* Aliyun OSS</a>, used to access OSS blob system in a filesystem style.
*/
public class AliyunOSSFileSystem extends FileSystem {
private URI uri;
private Path workingDir;
private OSSClient ossClient;
private String bucketName;
private long uploadPartSize;
private long multipartThreshold;
private int maxKeys;
private String serverSideEncryptionAlgorithm;
@Override
public FSDataOutputStream append(Path path, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Append is not supported!");
}
@Override
public void close() throws IOException {
try {
if (ossClient != null) {
ossClient.shutdown();
}
} finally {
super.close();
}
}
@Override
public FSDataOutputStream create(Path path, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
String key = pathToKey(path);
if (!overwrite && exists(path)) {
throw new FileAlreadyExistsException(path + " already exists");
}
return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(),
ossClient, bucketName, key, progress, statistics,
serverSideEncryptionAlgorithm), (Statistics)(null));
}
@Override
public boolean delete(Path path, boolean recursive) throws IOException {
FileStatus status;
try {
status = getFileStatus(path);
} catch (FileNotFoundException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Couldn't delete " + path + ": Does not exist!");
}
return false;
}
String key = pathToKey(status.getPath());
if (status.isDirectory()) {
if (!key.endsWith("/")) {
key += "/";
}
if (!recursive) {
FileStatus[] statuses = listStatus(status.getPath());
// Check whether it is an empty directory or not
if (statuses.length > 0) {
throw new IOException("Cannot remove directory" + path +
": It is not empty!");
} else {
// Delete empty directory without '-r'
ossClient.deleteObject(bucketName, key);
statistics.incrementWriteOps(1);
}
} else {
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(key);
listRequest.setMaxKeys(maxKeys);
while (true) {
ObjectListing objects = ossClient.listObjects(listRequest);
statistics.incrementReadOps(1);
List<String> keysToDelete = new ArrayList<String>();
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
keysToDelete.add(objectSummary.getKey());
}
DeleteObjectsRequest deleteRequest =
new DeleteObjectsRequest(bucketName);
deleteRequest.setKeys(keysToDelete);
ossClient.deleteObjects(deleteRequest);
statistics.incrementWriteOps(1);
if (objects.isTruncated()) {
listRequest.setMarker(objects.getNextMarker());
} else {
break;
}
}
}
} else {
ossClient.deleteObject(bucketName, key);
statistics.incrementWriteOps(1);
}
//TODO: optimize logic here
try {
Path pPath = status.getPath().getParent();
FileStatus pStatus = getFileStatus(pPath);
if (pStatus.isDirectory()) {
return true;
} else {
throw new IOException("Path " + pPath +
" is assumed to be a directory!");
}
} catch (FileNotFoundException fnfe) {
// Make sure the parent directory exists
return mkdir(bucketName, pathToKey(status.getPath().getParent()));
}
}
@Override
public FileStatus getFileStatus(Path path) throws IOException {
Path qualifiedPath = path.makeQualified(uri, workingDir);
String key = pathToKey(qualifiedPath);
// Root always exists
if (key.length() == 0) {
return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
}
ObjectMetadata meta = getObjectMetadata(key);
// If key not found and key does not end with "/"
if (meta == null && !key.endsWith("/")) {
// Case: dir + "/"
key += "/";
meta = getObjectMetadata(key);
}
if (meta == null) {
// Case: dir + "/" + file
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
listRequest.setPrefix(key);
listRequest.setDelimiter("/");
listRequest.setMaxKeys(1);
ObjectListing listing = ossClient.listObjects(listRequest);
statistics.incrementReadOps(1);
if (!listing.getObjectSummaries().isEmpty() ||
!listing.getCommonPrefixes().isEmpty()) {
return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
} else {
throw new FileNotFoundException(path + ": No such file or directory!");
}
} else if (objectRepresentsDirectory(key, meta.getContentLength())) {
return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
} else {
return new FileStatus(meta.getContentLength(), false, 1,
getDefaultBlockSize(path), meta.getLastModified().getTime(),
qualifiedPath);
}
}
/**
* Return object metadata given object key.
*
* @param key object key
* @return return null if key does not exist
*/
private ObjectMetadata getObjectMetadata(String key) {
try {
return ossClient.getObjectMetadata(bucketName, key);
} catch (OSSException osse) {
return null;
} finally {
statistics.incrementReadOps(1);
}
}
@Override
public String getScheme() {
return "oss";
}
@Override
public URI getUri() {
return uri;
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Deprecated
public long getDefaultBlockSize() {
return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT);
}
@Override
public String getCanonicalServiceName() {
// Does not support Token
return null;
}
/**
* Initialize new FileSystem.
*
* @param name the uri of the file system, including host, port, etc.
*
* @param conf configuration of the file system
* @throws IOException IO problems
*/
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
workingDir =
new Path("/user",
System.getProperty("user.name")).makeQualified(uri, null);
bucketName = name.getHost();
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
MAXIMUM_CONNECTIONS_DEFAULT));
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY,
SECURE_CONNECTIONS_DEFAULT);
clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY,
MAX_ERROR_RETRIES_DEFAULT));
clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY,
ESTABLISH_TIMEOUT_DEFAULT));
clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY,
SOCKET_TIMEOUT_DEFAULT));
String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
if (!proxyHost.isEmpty()) {
clientConf.setProxyHost(proxyHost);
if (proxyPort >= 0) {
clientConf.setProxyPort(proxyPort);
} else {
if (secureConnections) {
LOG.warn("Proxy host set without port. Using HTTPS default 443");
clientConf.setProxyPort(443);
} else {
LOG.warn("Proxy host set without port. Using HTTP default 80");
clientConf.setProxyPort(80);
}
}
String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY);
String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY);
if ((proxyUsername == null) != (proxyPassword == null)) {
String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " +
PROXY_PASSWORD_KEY + " set without the other.";
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
clientConf.setProxyUsername(proxyUsername);
clientConf.setProxyPassword(proxyPassword);
clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY));
clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY));
} else if (proxyPort >= 0) {
String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " +
PROXY_HOST_KEY;
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
ossClient =
new OSSClient(endPoint, getCredentialsProvider(name, conf), clientConf);
maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
MULTIPART_UPLOAD_SIZE_DEFAULT);
multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
if (uploadPartSize < 5 * 1024 * 1024) {
LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
uploadPartSize = 5 * 1024 * 1024;
}
if (multipartThreshold < 5 * 1024 * 1024) {
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
multipartThreshold = 5 * 1024 * 1024;
}
if (multipartThreshold > 1024 * 1024 * 1024) {
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
multipartThreshold = 1024 * 1024 * 1024;
}
String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
if (!cannedACLName.isEmpty()) {
CannedAccessControlList cannedACL =
CannedAccessControlList.valueOf(cannedACLName);
ossClient.setBucketAcl(bucketName, cannedACL);
}
serverSideEncryptionAlgorithm =
conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
setConf(conf);
}
/**
* Create the default credential provider, or load in one explicitly
* identified in the configuration.
* @param name the uri of the file system
* @param conf configuration
* @return a credential provider
* @throws IOException on any problem. Class construction issues may be
* nested inside the IOE.
*/
private CredentialsProvider getCredentialsProvider(URI name,
Configuration conf) throws IOException {
CredentialsProvider credentials;
String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY);
if (StringUtils.isEmpty(className)) {
Configuration newConf =
ProviderUtils.excludeIncompatibleCredentialProviders(conf,
AliyunOSSFileSystem.class);
String accessKey =
AliyunOSSUtils.getPassword(newConf, ACCESS_KEY,
UserInfo.EMPTY.getUser());
String secretKey =
AliyunOSSUtils.getPassword(newConf, SECRET_KEY,
UserInfo.EMPTY.getPassword());
credentials =
new DefaultCredentialProvider(
new DefaultCredentials(accessKey, secretKey));
} else {
try {
LOG.debug("Credential provider class is:" + className);
Class<?> credClass = Class.forName(className);
try {
credentials =
(CredentialsProvider)credClass.getDeclaredConstructor(
URI.class, Configuration.class).newInstance(this.uri, conf);
} catch (NoSuchMethodException | SecurityException e) {
credentials =
(CredentialsProvider)credClass.getDeclaredConstructor()
.newInstance();
}
} catch (ClassNotFoundException e) {
throw new IOException(className + " not found.", e);
} catch (NoSuchMethodException | SecurityException e) {
throw new IOException(String.format("%s constructor exception. A " +
"class specified in %s must provide an accessible constructor " +
"accepting URI and Configuration, or an accessible default " +
"constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), e);
} catch (ReflectiveOperationException | IllegalArgumentException e) {
throw new IOException(className + " instantiation exception.", e);
}
}
return credentials;
}
/**
* Check if OSS object represents a directory.
*
* @param name object key
* @param size object content length
* @return true if object represents a directory
*/
private boolean objectRepresentsDirectory(final String name,
final long size) {
return !name.isEmpty() && name.endsWith("/") && size == 0L;
}
/**
* Turns a path (relative or otherwise) into an OSS key.
*
* @param path the path of the file
* @return the key of the object that represent the file
*/
private String pathToKey(Path path) {
if (!path.isAbsolute()) {
path = new Path(workingDir, path);
}
if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
return "";
}
return path.toUri().getPath().substring(1);
}
private Path keyToPath(String key) {
return new Path("/" + key);
}
@Override
public FileStatus[] listStatus(Path path) throws IOException {
String key = pathToKey(path);
if (LOG.isDebugEnabled()) {
LOG.debug("List status for path: " + path);
}
final List<FileStatus> result = new ArrayList<FileStatus>();
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
if (!key.endsWith("/")) {
key = key + "/";
}
ListObjectsRequest listObjectsRequest =
new ListObjectsRequest(bucketName);
listObjectsRequest.setPrefix(key);
listObjectsRequest.setDelimiter("/");
listObjectsRequest.setMaxKeys(maxKeys);
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: doing listObjects for directory " + key);
}
while (true) {
ObjectListing objects = ossClient.listObjects(listObjectsRequest);
statistics.incrementReadOps(1);
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
Path keyPath = keyToPath(objectSummary.getKey())
.makeQualified(uri, workingDir);
if (keyPath.equals(path)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + keyPath);
}
continue;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fi: " + keyPath);
}
result.add(new FileStatus(objectSummary.getSize(), false, 1,
getDefaultBlockSize(keyPath),
objectSummary.getLastModified().getTime(), keyPath));
}
}
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
if (keyPath.equals(path)) {
continue;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd: " + keyPath);
}
result.add(new FileStatus(0, true, 1, 0, 0, keyPath));
}
}
if (objects.isTruncated()) {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch");
}
listObjectsRequest.setMarker(objects.getNextMarker());
statistics.incrementReadOps(1);
} else {
break;
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd (not a dir): " + path);
}
result.add(fileStatus);
}
return result.toArray(new FileStatus[result.size()]);
}
/**
* Used to create an empty file that represents an empty directory.
*
* @param bucketName the bucket this directory belongs to
* @param objectName directory path
* @return true if directory successfully created
* @throws IOException
*/
private boolean mkdir(final String bucket, final String objectName)
throws IOException {
String dirName = objectName;
ObjectMetadata dirMeta = new ObjectMetadata();
byte[] buffer = new byte[0];
ByteArrayInputStream in = new ByteArrayInputStream(buffer);
dirMeta.setContentLength(0);
if (!objectName.endsWith("/")) {
dirName += "/";
}
try {
ossClient.putObject(bucket, dirName, in, dirMeta);
return true;
} finally {
in.close();
}
}
@Override
public boolean mkdirs(Path path, FsPermission permission)
throws IOException {
try {
FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
return true;
} else {
throw new FileAlreadyExistsException("Path is a file: " + path);
}
} catch (FileNotFoundException e) {
validatePath(path);
String key = pathToKey(path);
return mkdir(bucketName, key);
}
}
/**
* Check whether the path is a valid path.
*
* @param path the path to be checked
* @throws IOException
*/
private void validatePath(Path path) throws IOException {
Path fPart = path.getParent();
do {
try {
FileStatus fileStatus = getFileStatus(fPart);
if (fileStatus.isDirectory()) {
// If path exists and a directory, exit
break;
} else {
throw new FileAlreadyExistsException(String.format(
"Can't make directory for path '%s', it is a file.", fPart));
}
} catch (FileNotFoundException fnfe) {
}
fPart = fPart.getParent();
} while (fPart != null);
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open " + path +
" because it is a directory");
}
return new FSDataInputStream(new AliyunOSSInputStream(getConf(), ossClient,
bucketName, pathToKey(path), fileStatus.getLen(), statistics));
}
@Override
public boolean rename(Path srcPath, Path dstPath) throws IOException {
if (srcPath.isRoot()) {
// Cannot rename root of file system
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot rename the root of a filesystem");
}
return false;
}
Path parent = dstPath.getParent();
while (parent != null && !srcPath.equals(parent)) {
parent = parent.getParent();
}
if (parent != null) {
return false;
}
FileStatus srcStatus = getFileStatus(srcPath);
FileStatus dstStatus;
try {
dstStatus = getFileStatus(dstPath);
} catch (FileNotFoundException fnde) {
dstStatus = null;
}
if (dstStatus == null) {
// If dst doesn't exist, check whether dst dir exists or not
dstStatus = getFileStatus(dstPath.getParent());
if (!dstStatus.isDirectory()) {
throw new IOException(String.format(
"Failed to rename %s to %s, %s is a file", srcPath, dstPath,
dstPath.getParent()));
}
} else {
if (srcStatus.getPath().equals(dstStatus.getPath())) {
return !srcStatus.isDirectory();
} else if (dstStatus.isDirectory()) {
// If dst is a directory
dstPath = new Path(dstPath, srcPath.getName());
FileStatus[] statuses;
try {
statuses = listStatus(dstPath);
} catch (FileNotFoundException fnde) {
statuses = null;
}
if (statuses != null && statuses.length > 0) {
// If dst exists and not a directory / not empty
throw new FileAlreadyExistsException(String.format(
"Failed to rename %s to %s, file already exists or not empty!",
srcPath, dstPath));
}
} else {
// If dst is not a directory
throw new FileAlreadyExistsException(String.format(
"Failed to rename %s to %s, file already exists!", srcPath,
dstPath));
}
}
if (srcStatus.isDirectory()) {
copyDirectory(srcPath, dstPath);
} else {
copyFile(srcPath, dstPath);
}
if (srcPath.equals(dstPath)) {
return true;
} else {
return delete(srcPath, true);
}
}
/**
* Copy file from source path to destination path.
* (the caller should make sure srcPath is a file and dstPath is valid.)
*
* @param srcPath source path
* @param dstPath destination path
* @return true if successfully copied
*/
private boolean copyFile(Path srcPath, Path dstPath) {
String srcKey = pathToKey(srcPath);
String dstKey = pathToKey(dstPath);
return copyFile(srcKey, dstKey);
}
/**
* Copy an object from source key to destination key.
*
* @param srcKey source key
* @param dstKey destination key
* @return true if successfully copied
*/
private boolean copyFile(String srcKey, String dstKey) {
ObjectMetadata objectMeta =
ossClient.getObjectMetadata(bucketName, srcKey);
long dataLen = objectMeta.getContentLength();
if (dataLen <= multipartThreshold) {
return singleCopy(srcKey, dstKey);
} else {
return multipartCopy(srcKey, dataLen, dstKey);
}
}
/**
* Use single copy to copy an oss object.
*
* @param srcKey source key
* @param dstKey destination key
* @return true if successfully copied
* (the caller should make sure srcPath is a file and dstPath is valid)
*/
private boolean singleCopy(String srcKey, String dstKey) {
CopyObjectResult copyResult =
ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
LOG.debug(copyResult.getETag());
return true;
}
/**
* Use multipart copy to copy an oss object.
* (the caller should make sure srcPath is a file and dstPath is valid)
*
* @param srcKey source key
* @param dataLen data size of the object to copy
* @param dstKey destination key
* @return true if successfully copied, or false if upload is aborted
*/
private boolean multipartCopy(String srcKey, long dataLen, String dstKey) {
int partNum = (int)(dataLen / uploadPartSize);
if (dataLen % uploadPartSize != 0) {
partNum++;
}
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, dstKey);
ObjectMetadata meta = new ObjectMetadata();
if (!serverSideEncryptionAlgorithm.isEmpty()) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
initiateMultipartUploadRequest.setObjectMetadata(meta);
InitiateMultipartUploadResult initiateMultipartUploadResult =
ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
String uploadId = initiateMultipartUploadResult.getUploadId();
List<PartETag> partETags = new ArrayList<PartETag>();
try {
for (int i = 0; i < partNum; i++) {
long skipBytes = uploadPartSize * i;
long size = (uploadPartSize < dataLen - skipBytes) ?
uploadPartSize : dataLen - skipBytes;
UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest();
partCopyRequest.setSourceBucketName(bucketName);
partCopyRequest.setSourceKey(srcKey);
partCopyRequest.setBucketName(bucketName);
partCopyRequest.setKey(dstKey);
partCopyRequest.setUploadId(uploadId);
partCopyRequest.setPartSize(size);
partCopyRequest.setBeginIndex(skipBytes);
partCopyRequest.setPartNumber(i + 1);
UploadPartCopyResult partCopyResult =
ossClient.uploadPartCopy(partCopyRequest);
statistics.incrementWriteOps(1);
partETags.add(partCopyResult.getPartETag());
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, dstKey,
uploadId, partETags);
CompleteMultipartUploadResult completeMultipartUploadResult =
ossClient.completeMultipartUpload(completeMultipartUploadRequest);
LOG.debug(completeMultipartUploadResult.getETag());
return true;
} catch (Exception e) {
AbortMultipartUploadRequest abortMultipartUploadRequest =
new AbortMultipartUploadRequest(bucketName, dstKey, uploadId);
ossClient.abortMultipartUpload(abortMultipartUploadRequest);
return false;
}
}
/**
* Copy a directory from source path to destination path.
* (the caller should make sure srcPath is a directory, and dstPath is valid)
*
* @param srcPath source path
* @param dstPath destination path
* @return true if successfully copied
*/
private boolean copyDirectory(Path srcPath, Path dstPath) {
String srcKey = pathToKey(srcPath);
String dstKey = pathToKey(dstPath);
if (!srcKey.endsWith("/")) {
srcKey = srcKey + "/";
}
if (!dstKey.endsWith("/")) {
dstKey = dstKey + "/";
}
if (dstKey.startsWith(srcKey)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot rename a directory to a subdirectory of self");
}
return false;
}
ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName);
listObjectsRequest.setPrefix(srcKey);
listObjectsRequest.setMaxKeys(maxKeys);
ObjectListing objects = ossClient.listObjects(listObjectsRequest);
statistics.incrementReadOps(1);
// Copy files from src folder to dst
while (true) {
for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
String newKey =
dstKey.concat(objectSummary.getKey().substring(srcKey.length()));
copyFile(objectSummary.getKey(), newKey);
}
if (objects.isTruncated()) {
listObjectsRequest.setMarker(objects.getNextMarker());
statistics.incrementReadOps(1);
} else {
break;
}
}
return true;
}
@Override
public void setWorkingDirectory(Path dir) {
this.workingDir = dir;
}
}

View File

@ -0,0 +1,268 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.GetObjectRequest;
/**
* The input stream for OSS blob system.
* The class uses multi-part downloading to read data from the object content
* stream.
*/
public class AliyunOSSInputStream extends FSInputStream {
public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class);
private static final int MAX_RETRIES = 10;
private final long downloadPartSize;
private String bucketName;
private String key;
private OSSClient ossClient;
private Statistics statistics;
private boolean closed;
private InputStream wrappedStream = null;
private long dataLen;
private long position;
private long partRemaining;
public AliyunOSSInputStream(Configuration conf, OSSClient client,
String bucketName, String key, Long dataLen, Statistics statistics)
throws IOException {
this.bucketName = bucketName;
this.key = key;
ossClient = client;
this.statistics = statistics;
this.dataLen = dataLen;
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
reopen(0);
closed = false;
}
/**
* Reopen the wrapped stream at give position, by seeking for
* data of a part length from object content stream.
*
* @param pos position from start of a file
* @throws IOException if failed to reopen
*/
private synchronized void reopen(long pos) throws IOException {
long partLen;
if (pos < 0) {
throw new EOFException("Cannot seek at negtive position:" + pos);
} else if (pos > dataLen) {
throw new EOFException("Cannot seek after EOF, fileLen:" + dataLen +
" position:" + pos);
} else if (pos + downloadPartSize > dataLen) {
partLen = dataLen - pos;
} else {
partLen = downloadPartSize;
}
if (wrappedStream != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Aborting old stream to open at pos " + pos);
}
wrappedStream.close();
}
GetObjectRequest request = new GetObjectRequest(bucketName, key);
request.setRange(pos, pos + partLen - 1);
wrappedStream = ossClient.getObject(request).getObjectContent();
if (wrappedStream == null) {
throw new IOException("Null IO stream");
}
position = pos;
partRemaining = partLen;
}
@Override
public synchronized int read() throws IOException {
checkNotClosed();
if (partRemaining <= 0 && position < dataLen) {
reopen(position);
}
int tries = MAX_RETRIES;
boolean retry;
int byteRead = -1;
do {
retry = false;
try {
byteRead = wrappedStream.read();
} catch (Exception e) {
handleReadException(e, --tries);
retry = true;
}
} while (retry);
if (byteRead >= 0) {
position++;
partRemaining--;
}
if (statistics != null && byteRead >= 0) {
statistics.incrementBytesRead(1);
}
return byteRead;
}
/**
* Check whether the input stream is closed.
*
* @throws IOException if stream is closed
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException("Stream is closed!");
}
}
@Override
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
checkNotClosed();
if (buf == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > buf.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int bytesRead = 0;
// Not EOF, and read not done
while (position < dataLen && bytesRead < len) {
if (partRemaining == 0) {
reopen(position);
}
int tries = MAX_RETRIES;
boolean retry;
int bytes = -1;
do {
retry = false;
try {
bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
} catch (Exception e) {
handleReadException(e, --tries);
retry = true;
}
} while (retry);
if (bytes > 0) {
bytesRead += bytes;
position += bytes;
partRemaining -= bytes;
} else if (partRemaining != 0) {
throw new IOException("Failed to read from stream. Remaining:" +
partRemaining);
}
}
if (statistics != null && bytesRead > 0) {
statistics.incrementBytesRead(bytesRead);
}
// Read nothing, but attempt to read something
if (bytesRead == 0 && len > 0) {
return -1;
} else {
return bytesRead;
}
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
if (wrappedStream != null) {
wrappedStream.close();
}
}
@Override
public synchronized int available() throws IOException {
checkNotClosed();
long remaining = dataLen - position;
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int)remaining;
}
@Override
public void seek(long pos) throws IOException {
checkNotClosed();
if (position == pos) {
return;
} else if (pos > position && pos < position + partRemaining) {
wrappedStream.skip(pos - position);
position = pos;
} else {
reopen(pos);
}
}
@Override
public long getPos() throws IOException {
checkNotClosed();
return position;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
checkNotClosed();
return false;
}
private void handleReadException(Exception e, int tries) throws IOException{
if (tries == 0) {
throw new IOException(e);
}
LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
" connection at position '" + position + "', " + e.getMessage());
try {
Thread.sleep(100);
} catch (InterruptedException e2) {
LOG.warn(e2.getMessage());
}
reopen(position);
}
}

View File

@ -0,0 +1,219 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.util.Progressable;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.AbortMultipartUploadRequest;
import com.aliyun.oss.model.CompleteMultipartUploadRequest;
import com.aliyun.oss.model.CompleteMultipartUploadResult;
import com.aliyun.oss.model.InitiateMultipartUploadRequest;
import com.aliyun.oss.model.InitiateMultipartUploadResult;
import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.PartETag;
import com.aliyun.oss.model.PutObjectResult;
import com.aliyun.oss.model.UploadPartRequest;
import com.aliyun.oss.model.UploadPartResult;
/**
* The output stream for OSS blob system.
* Data will be buffered on local disk, then uploaded to OSS in
* {@link #close()} method.
*/
public class AliyunOSSOutputStream extends OutputStream {
public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class);
private String bucketName;
private String key;
private Statistics statistics;
private Progressable progress;
private String serverSideEncryptionAlgorithm;
private long partSize;
private long partSizeThreshold;
private LocalDirAllocator dirAlloc;
private boolean closed;
private File tmpFile;
private BufferedOutputStream backupStream;
private OSSClient ossClient;
public AliyunOSSOutputStream(Configuration conf, OSSClient client,
String bucketName, String key, Progressable progress,
Statistics statistics, String serverSideEncryptionAlgorithm)
throws IOException {
this.bucketName = bucketName;
this.key = key;
// The caller cann't get any progress information
this.progress = progress;
ossClient = client;
this.statistics = statistics;
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
MULTIPART_UPLOAD_SIZE_DEFAULT);
partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
if (conf.get(BUFFER_DIR_KEY) == null) {
conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
}
dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
tmpFile = dirAlloc.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
closed = false;
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
if (backupStream != null) {
backupStream.close();
}
long dataLen = tmpFile.length();
try {
if (dataLen <= partSizeThreshold) {
uploadObject();
} else {
multipartUploadObject();
}
} finally {
tmpFile.delete();
}
}
/**
* Upload temporary file as an OSS object, using single upload.
*
* @throws IOException
*/
private void uploadObject() throws IOException {
File object = tmpFile.getAbsoluteFile();
FileInputStream fis = new FileInputStream(object);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(object.length());
if (!serverSideEncryptionAlgorithm.isEmpty()) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
try {
PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
LOG.debug(result.getETag());
statistics.incrementWriteOps(1);
} finally {
fis.close();
}
}
/**
* Upload temporary file as an OSS object, using multipart upload.
*
* @throws IOException
*/
private void multipartUploadObject() throws IOException {
File object = tmpFile.getAbsoluteFile();
long dataLen = object.length();
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, key);
ObjectMetadata meta = new ObjectMetadata();
// meta.setContentLength(dataLen);
if (!serverSideEncryptionAlgorithm.isEmpty()) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
initiateMultipartUploadRequest.setObjectMetadata(meta);
InitiateMultipartUploadResult initiateMultipartUploadResult =
ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
int partNum = (int)(dataLen / partSize);
if (dataLen % partSize != 0) {
partNum += 1;
}
if (partNum > MULTIPART_UPLOAD_PART_NUM_LIMIT) {
throw new IOException("Number of parts " + partNum + " should not be " +
"bigger than limit " + MULTIPART_UPLOAD_PART_NUM_LIMIT);
}
List<PartETag> partETags = new ArrayList<PartETag>();
String uploadId = initiateMultipartUploadResult.getUploadId();
try {
for (int i = 0; i < partNum; i++) {
// TODO: Optimize this, avoid opening the object multiple times
FileInputStream fis = new FileInputStream(object);
try {
long skipBytes = partSize * i;
fis.skip(skipBytes);
long size = (partSize < dataLen - skipBytes) ?
partSize : dataLen - skipBytes;
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucketName);
uploadPartRequest.setKey(key);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(fis);
uploadPartRequest.setPartSize(size);
uploadPartRequest.setPartNumber(i + 1);
UploadPartResult uploadPartResult =
ossClient.uploadPart(uploadPartRequest);
statistics.incrementWriteOps(1);
partETags.add(uploadPartResult.getPartETag());
} finally {
fis.close();
}
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, key,
uploadId, partETags);
CompleteMultipartUploadResult completeMultipartUploadResult =
ossClient.completeMultipartUpload(completeMultipartUploadRequest);
LOG.debug(completeMultipartUploadResult.getETag());
} catch (Exception e) {
AbortMultipartUploadRequest abortMultipartUploadRequest =
new AbortMultipartUploadRequest(bucketName, key, uploadId);
ossClient.abortMultipartUpload(abortMultipartUploadRequest);
}
}
@Override
public synchronized void flush() throws IOException {
backupStream.flush();
}
@Override
public synchronized void write(int b) throws IOException {
backupStream.write(b);
statistics.incrementBytesWritten(1);
}
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Objects;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
/**
* Utility methods for Aliyun OSS code.
*/
final public class AliyunOSSUtils {
private AliyunOSSUtils() {
}
/**
* User information includes user name and password.
*/
static public class UserInfo {
private final String user;
private final String password;
public static final UserInfo EMPTY = new UserInfo("", "");
public UserInfo(String user, String password) {
this.user = user;
this.password = password;
}
/**
* Predicate to verify user information is set.
* @return true if the username is defined (not null, not empty).
*/
public boolean hasLogin() {
return StringUtils.isNotEmpty(user);
}
/**
* Equality test matches user and password.
* @param o other object
* @return true if the objects are considered equivalent.
*/
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UserInfo that = (UserInfo) o;
return Objects.equals(user, that.user) &&
Objects.equals(password, that.password);
}
@Override
public int hashCode() {
return Objects.hash(user, password);
}
public String getUser() {
return user;
}
public String getPassword() {
return password;
}
}
/**
* Used to get password from configuration, if default value is not available.
* @param conf configuration that contains password information
* @param key the key of the password
* @param val the default value of the key
* @return the value for the key
* @throws IOException if failed to get password from configuration
*/
static public String getPassword(Configuration conf, String key, String val)
throws IOException {
if (StringUtils.isEmpty(val)) {
try {
final char[] pass = conf.getPassword(key);
if (pass != null) {
return (new String(pass)).trim();
} else {
return "";
}
} catch (IOException ioe) {
throw new IOException("Cannot find password option " + key, ioe);
}
} else {
return val;
}
}
/**
* Extract the user information details from a URI.
* @param name URI of the filesystem
* @return a login tuple, possibly empty.
*/
public static UserInfo extractLoginDetails(URI name) {
try {
String authority = name.getAuthority();
if (authority == null) {
return UserInfo.EMPTY;
}
int loginIndex = authority.indexOf('@');
if (loginIndex < 0) {
// No user information
return UserInfo.EMPTY;
}
String login = authority.substring(0, loginIndex);
int loginSplit = login.indexOf(':');
if (loginSplit > 0) {
String user = login.substring(0, loginSplit);
String password = URLDecoder.decode(login.substring(loginSplit + 1),
"UTF-8");
return new UserInfo(user, password);
} else if (loginSplit == 0) {
// There is no user, just a password.
return UserInfo.EMPTY;
} else {
return new UserInfo(login, "");
}
} catch (UnsupportedEncodingException e) {
// This should never happen; translate it if it does.
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
/**
* ALL configuration constants for OSS filesystem.
*/
public final class Constants {
private Constants() {
}
// Class of credential provider
public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY =
"fs.oss.credentials.provider";
// OSS access verification
public static final String ACCESS_KEY = "fs.oss.access.key";
public static final String SECRET_KEY = "fs.oss.secret.key";
// Number of simultaneous connections to oss
public static final String MAXIMUM_CONNECTIONS_KEY =
"fs.oss.connection.maximum";
public static final int MAXIMUM_CONNECTIONS_DEFAULT = 32;
// Connect to oss over ssl
public static final String SECURE_CONNECTIONS_KEY =
"fs.oss.connection.secure.enabled";
public static final boolean SECURE_CONNECTIONS_DEFAULT = true;
// Use a custom endpoint
public static final String ENDPOINT_KEY = "fs.oss.endpoint";
// Connect to oss through a proxy server
public static final String PROXY_HOST_KEY = "fs.oss.proxy.host";
public static final String PROXY_PORT_KEY = "fs.oss.proxy.port";
public static final String PROXY_USERNAME_KEY = "fs.oss.proxy.username";
public static final String PROXY_PASSWORD_KEY = "fs.oss.proxy.password";
public static final String PROXY_DOMAIN_KEY = "fs.oss.proxy.domain";
public static final String PROXY_WORKSTATION_KEY =
"fs.oss.proxy.workstation";
// Number of times we should retry errors
public static final String MAX_ERROR_RETRIES_KEY = "fs.oss.attempts.maximum";
public static final int MAX_ERROR_RETRIES_DEFAULT = 20;
// Time until we give up trying to establish a connection to oss
public static final String ESTABLISH_TIMEOUT_KEY =
"fs.oss.connection.establish.timeout";
public static final int ESTABLISH_TIMEOUT_DEFAULT = 50000;
// Time until we give up on a connection to oss
public static final String SOCKET_TIMEOUT_KEY = "fs.oss.connection.timeout";
public static final int SOCKET_TIMEOUT_DEFAULT = 200000;
// Number of records to get while paging through a directory listing
public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum";
public static final int MAX_PAGING_KEYS_DEFAULT = 500;
// Size of each of or multipart pieces in bytes
public static final String MULTIPART_UPLOAD_SIZE_KEY =
"fs.oss.multipart.upload.size";
public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024;
public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 1000;
// Minimum size in bytes before we start a multipart uploads or copy
public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY =
"fs.oss.multipart.upload.threshold";
public static final long MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT =
20 * 1024 * 1024;
public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
"fs.oss.multipart.download.size";
public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
// Comma separated list of directories
public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
// private | public-read | public-read-write | authenticated-read |
// log-delivery-write | bucket-owner-read | bucket-owner-full-control
public static final String CANNED_ACL_KEY = "fs.oss.acl.default";
public static final String CANNED_ACL_DEFAULT = "";
// OSS server-side encryption
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY =
"fs.oss.server-side-encryption-algorithm";
public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size";
public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024;
public static final String FS_OSS = "oss";
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Aliyun OSS Filesystem.
*/
package org.apache.hadoop.fs.aliyun.oss;

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.internal.AssumptionViolatedException;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.Random;
/**
* Utility class for OSS Tests.
*/
public final class OSSTestUtils {
private OSSTestUtils() {
}
/**
* Create the test filesystem.
*
* If the test.fs.oss.name property is not set,
* tests will fail.
*
* @param conf configuration
* @return the FS
* @throws IOException
*/
public static AliyunOSSFileSystem createTestFileSystem(Configuration conf)
throws IOException {
String fsname = conf.getTrimmed(
TestOSSFileSystemContract.TEST_FS_OSS_NAME, "");
boolean liveTest = !StringUtils.isEmpty(fsname);
URI testURI = null;
if (liveTest) {
testURI = URI.create(fsname);
liveTest = testURI.getScheme().equals(Constants.FS_OSS);
}
if (!liveTest) {
throw new AssumptionViolatedException("No test filesystem in "
+ TestOSSFileSystemContract.TEST_FS_OSS_NAME);
}
AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem();
ossfs.initialize(testURI, conf);
return ossfs;
}
/**
* Generate unique test path for multiple user tests.
*
* @return root test path
*/
public static String generateUniqueTestPath() {
Long time = new Date().getTime();
Random rand = new Random();
return "/test_" + Long.toString(time) + "_"
+ Long.toString(Math.abs(rand.nextLong()));
}
}

View File

@ -0,0 +1,253 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
/**
* Tests a live OSS system.
*
* This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from
* TestCase which uses the old Junit3 runner that doesn't ignore assumptions
* properly making it impossible to skip the tests if we don't have a valid
* bucket.
*/
public class TestOSSFileSystemContract extends FileSystemContractBaseTest {
protected static final Logger LOG =
LoggerFactory.getLogger(TestOSSFileSystemContract.class);
public static final String TEST_FS_OSS_NAME = "test.fs.oss.name";
private static String testRootPath = OSSTestUtils.generateUniqueTestPath();
@Override
public void setUp() throws Exception {
Configuration conf = new Configuration();
fs = OSSTestUtils.createTestFileSystem(conf);
super.setUp();
}
@Override
public void tearDown() throws Exception {
if (fs != null) {
fs.delete(super.path(testRootPath), true);
}
super.tearDown();
}
@Override
protected Path path(String path) {
if (path.startsWith("/")) {
return super.path(testRootPath + path);
} else {
return super.path(testRootPath + "/" + path);
}
}
@Override
public void testMkdirsWithUmask() throws Exception {
// not supported
}
/**
* Assert that root directory renames are not allowed.
*
* @throws Exception on failures
*/
@Override
public void testRootDirAlwaysExists() throws Exception {
//this will throw an exception if the path is not found
fs.getFileStatus(super.path("/"));
//this catches overrides of the base exists() method that don't
//use getFileStatus() as an existence probe
assertTrue("FileSystem.exists() fails for root",
fs.exists(super.path("/")));
}
/**
* Assert that root directory renames are not allowed.
*
* @throws Exception on failures
*/
@Override
public void testRenameRootDirForbidden() throws Exception {
if (!renameSupported()) {
return;
}
rename(super.path("/"),
super.path("/test/newRootDir"),
false, true, false);
}
public void testDeleteSubdir() throws IOException {
Path parentDir = this.path("/test/hadoop");
Path file = this.path("/test/hadoop/file");
Path subdir = this.path("/test/hadoop/subdir");
this.createFile(file);
assertTrue("Created subdir", this.fs.mkdirs(subdir));
assertTrue("File exists", this.fs.exists(file));
assertTrue("Parent dir exists", this.fs.exists(parentDir));
assertTrue("Subdir exists", this.fs.exists(subdir));
assertTrue("Deleted subdir", this.fs.delete(subdir, true));
assertTrue("Parent should exist", this.fs.exists(parentDir));
assertTrue("Deleted file", this.fs.delete(file, false));
assertTrue("Parent should exist", this.fs.exists(parentDir));
}
@Override
protected boolean renameSupported() {
return true;
}
@Override
public void testRenameNonExistentPath() throws Exception {
if (this.renameSupported()) {
Path src = this.path("/test/hadoop/path");
Path dst = this.path("/test/new/newpath");
try {
super.rename(src, dst, false, false, false);
fail("Should throw FileNotFoundException!");
} catch (FileNotFoundException e) {
// expected
}
}
}
@Override
public void testRenameFileMoveToNonExistentDirectory() throws Exception {
if (this.renameSupported()) {
Path src = this.path("/test/hadoop/file");
this.createFile(src);
Path dst = this.path("/test/new/newfile");
try {
super.rename(src, dst, false, true, false);
fail("Should throw FileNotFoundException!");
} catch (FileNotFoundException e) {
// expected
}
}
}
@Override
public void testRenameDirectoryMoveToNonExistentDirectory() throws Exception {
if (this.renameSupported()) {
Path src = this.path("/test/hadoop/dir");
this.fs.mkdirs(src);
Path dst = this.path("/test/new/newdir");
try {
super.rename(src, dst, false, true, false);
fail("Should throw FileNotFoundException!");
} catch (FileNotFoundException e) {
// expected
}
}
}
@Override
public void testRenameFileMoveToExistingDirectory() throws Exception {
super.testRenameFileMoveToExistingDirectory();
}
@Override
public void testRenameFileAsExistingFile() throws Exception {
if (this.renameSupported()) {
Path src = this.path("/test/hadoop/file");
this.createFile(src);
Path dst = this.path("/test/new/newfile");
this.createFile(dst);
try {
super.rename(src, dst, false, true, true);
fail("Should throw FileAlreadyExistsException");
} catch (FileAlreadyExistsException e) {
// expected
}
}
}
@Override
public void testRenameDirectoryAsExistingFile() throws Exception {
if (this.renameSupported()) {
Path src = this.path("/test/hadoop/dir");
this.fs.mkdirs(src);
Path dst = this.path("/test/new/newfile");
this.createFile(dst);
try {
super.rename(src, dst, false, true, true);
fail("Should throw FileAlreadyExistsException");
} catch (FileAlreadyExistsException e) {
// expected
}
}
}
public void testGetFileStatusFileAndDirectory() throws Exception {
Path filePath = this.path("/test/oss/file1");
this.createFile(filePath);
assertTrue("Should be file", this.fs.getFileStatus(filePath).isFile());
assertFalse("Should not be directory",
this.fs.getFileStatus(filePath).isDirectory());
Path dirPath = this.path("/test/oss/dir");
this.fs.mkdirs(dirPath);
assertTrue("Should be directory",
this.fs.getFileStatus(dirPath).isDirectory());
assertFalse("Should not be file", this.fs.getFileStatus(dirPath).isFile());
}
public void testMkdirsForExistingFile() throws Exception {
Path testFile = this.path("/test/hadoop/file");
assertFalse(this.fs.exists(testFile));
this.createFile(testFile);
assertTrue(this.fs.exists(testFile));
try {
this.fs.mkdirs(testFile);
fail("Should throw FileAlreadyExistsException!");
} catch (FileAlreadyExistsException e) {
// expected
}
}
public void testWorkingDirectory() throws Exception {
Path workDir = super.path(this.getDefaultWorkingDirectory());
assertEquals(workDir, this.fs.getWorkingDirectory());
this.fs.setWorkingDirectory(super.path("."));
assertEquals(workDir, this.fs.getWorkingDirectory());
this.fs.setWorkingDirectory(super.path(".."));
assertEquals(workDir.getParent(), this.fs.getWorkingDirectory());
Path relativeDir = super.path("hadoop");
this.fs.setWorkingDirectory(relativeDir);
assertEquals(relativeDir, this.fs.getWorkingDirectory());
Path absoluteDir = super.path("/test/hadoop");
this.fs.setWorkingDirectory(absoluteDir);
assertEquals(absoluteDir, this.fs.getWorkingDirectory());
}
}

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.junit.*;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import static org.junit.Assert.assertTrue;
/**
* Tests basic functionality for AliyunOSSInputStream, including seeking and
* reading files.
*/
public class TestOSSInputStream {
private FileSystem fs;
protected static final Logger LOG =
LoggerFactory.getLogger(TestOSSInputStream.class);
private static String testRootPath = OSSTestUtils.generateUniqueTestPath();
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
fs = OSSTestUtils.createTestFileSystem(conf);
}
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.delete(new Path(testRootPath), true);
}
}
private Path setPath(String path) {
if (path.startsWith("/")) {
return new Path(testRootPath + path);
} else {
return new Path(testRootPath + "/" + path);
}
}
@Test
public void testSeekFile() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFile.txt");
long size = 5 * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, smallSeekFile, size, 256, 255);
LOG.info("5MB file created: smallSeekFile.txt");
FSDataInputStream instream = this.fs.open(smallSeekFile);
int seekTimes = 5;
LOG.info("multiple fold position seeking test...:");
for (int i = 0; i < seekTimes; i++) {
long pos = size / (seekTimes - i) - 1;
LOG.info("begin seeking for pos: " + pos);
instream.seek(pos);
assertTrue("expected position at:" + pos + ", but got:"
+ instream.getPos(), instream.getPos() == pos);
LOG.info("completed seeking at pos: " + instream.getPos());
}
LOG.info("random position seeking test...:");
Random rand = new Random();
for (int i = 0; i < seekTimes; i++) {
long pos = Math.abs(rand.nextLong()) % size;
LOG.info("begin seeking for pos: " + pos);
instream.seek(pos);
assertTrue("expected position at:" + pos + ", but got:"
+ instream.getPos(), instream.getPos() == pos);
LOG.info("completed seeking at pos: " + instream.getPos());
}
IOUtils.closeStream(instream);
}
@Test
public void testReadFile() throws Exception {
final int bufLen = 256;
final int sizeFlag = 5;
String filename = "readTestFile_" + sizeFlag + ".txt";
Path readTestFile = setPath("/test/" + filename);
long size = sizeFlag * 1024 * 1024;
ContractTestUtils.generateTestFile(this.fs, readTestFile, size, 256, 255);
LOG.info(sizeFlag + "MB file created: /test/" + filename);
FSDataInputStream instream = this.fs.open(readTestFile);
byte[] buf = new byte[bufLen];
long bytesRead = 0;
while (bytesRead < size) {
int bytes;
if (size - bytesRead < bufLen) {
int remaining = (int)(size - bytesRead);
bytes = instream.read(buf, 0, remaining);
} else {
bytes = instream.read(buf, 0, bufLen);
}
bytesRead += bytes;
if (bytesRead % (1024 * 1024) == 0) {
int available = instream.available();
int remaining = (int)(size - bytesRead);
assertTrue("expected remaining:" + remaining + ", but got:" + available,
remaining == available);
LOG.info("Bytes read: " + Math.round((double)bytesRead / (1024 * 1024))
+ " MB");
}
}
assertTrue(instream.available() == 0);
IOUtils.closeStream(instream);
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.IOException;
/**
* Tests regular and multi-part upload functionality for AliyunOSSOutputStream.
*/
public class TestOSSOutputStream {
private FileSystem fs;
private static String testRootPath = OSSTestUtils.generateUniqueTestPath();
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024);
fs = OSSTestUtils.createTestFileSystem(conf);
}
@After
public void tearDown() throws Exception {
if (fs != null) {
fs.delete(new Path(testRootPath), true);
}
}
protected Path getTestPath() {
return new Path(testRootPath + "/testoss");
}
@Test
public void testRegularUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
}
@Test
public void testMultiPartUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.OSSTestUtils;
import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
/**
* The contract of OSS: only enabled if the test bucket is provided.
*/
public class OSSContract extends AbstractBondedFSContract {
public static final String CONTRACT_XML = "contract/oss.xml";
public static final String CONTRACT_TEST_OSS_FS_NAME =
"fs.contract.test.fs.oss";
private static String testPath = OSSTestUtils.generateUniqueTestPath();
public OSSContract(Configuration conf) {
super(conf);
//insert the base features
addConfResource(CONTRACT_XML);
}
@Override
public String getScheme() {
return "oss";
}
@Override
public Path getTestPath() {
Path path = new Path(testPath);
return path;
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
/**
* OSS contract creating tests.
*/
public class TestOSSContractCreate extends AbstractContractCreateTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new OSSContract(conf);
}
@Override
public void testOverwriteEmptyDirectory() throws Throwable {
ContractTestUtils.skip(
"blobstores can't distinguish empty directories from files");
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* OSS contract deleting tests.
*/
public class TestOSSContractDelete extends AbstractContractDeleteTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new OSSContract(conf);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* OSS contract directory tests.
*/
public class TestOSSContractMkdir extends AbstractContractMkdirTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new OSSContract(conf);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* OSS contract opening file tests.
*/
public class TestOSSContractOpen extends AbstractContractOpenTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new OSSContract(conf);
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* OSS contract renaming tests.
*/
public class TestOSSContractRename extends AbstractContractRenameTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new OSSContract(conf);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* OSS contract seeking tests.
*/
public class TestOSSContractSeek extends AbstractContractSeekTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new OSSContract(conf);
}
}

View File

@ -0,0 +1,105 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<property>
<name>fs.contract.test.random-seek-count</name>
<value>10</value>
</property>
<property>
<name>fs.contract.is-blobstore</name>
<value>true</value>
</property>
<property>
<name>fs.contract.is-case-sensitive</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-returns-false-if-source-missing</name>
<value>false</value>
</property>
<property>
<name>fs.contract.rename-remove-dest-if-empty-dir</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-append</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-directory-delete</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-rename</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-block-locality</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-concat</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-seek</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-seek-on-closed-file</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-strict-exceptions</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-unix-permissions</name>
<value>false</value>
</property>
<property>
<name>fs.contract.rename-overwrites-dest</name>
<value>true</value>
</property>
<property>
<name>fs.oss.multipart.download.size</name>
<value>102400</value>
</property>
</configuration>

View File

@ -0,0 +1,46 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>target/build/test</value>
<description>A base for other temporary directories.</description>
<final>true</final>
</property>
<!-- Turn security off for tests by default -->
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<!--
To run these tests.
# Create a file auth-keys.xml - DO NOT ADD TO REVISION CONTROL
# add the property test.fs.oss.name to point to an OSS filesystem URL
# Add the credentials for the service you are testing against
-->
<include xmlns="http://www.w3.org/2001/XInclude" href="auth-keys.xml">
<fallback/>
</include>
</configuration>

View File

@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# log4j configuration used during build and unit tests
log4j.rootLogger=INFO,stdout
log4j.threshold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n

View File

@ -100,6 +100,12 @@
<scope>compile</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<scope>compile</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-sls</artifactId>

View File

@ -47,6 +47,7 @@
<module>hadoop-aws</module>
<module>hadoop-kafka</module>
<module>hadoop-azure-datalake</module>
<module>hadoop-aliyun</module>
</modules>
<build>