HADOOP-14999. AliyunOSS: provide one asynchronous multi-part based uploading mechanism. Contributed by Genmao Yu.

(cherry picked from commit 6542d17ea4)
This commit is contained in:
Sammi Chen 2018-03-30 20:23:05 +08:00
parent 99b5b9dce1
commit e96c7bf82d
10 changed files with 457 additions and 251 deletions

View File

@ -35,8 +35,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
public class AliyunCredentialsProvider implements CredentialsProvider { public class AliyunCredentialsProvider implements CredentialsProvider {
private Credentials credentials = null; private Credentials credentials = null;
public AliyunCredentialsProvider(Configuration conf) public AliyunCredentialsProvider(Configuration conf) throws IOException {
throws IOException {
String accessKeyId; String accessKeyId;
String accessKeySecret; String accessKeySecret;
String securityToken; String securityToken;

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.model.PartETag;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
/**
* Asynchronous multi-part based uploading mechanism to support huge file
* which is larger than 5GB. Data will be buffered on local disk, then uploaded
* to OSS in {@link #close()} method.
*/
public class AliyunOSSBlockOutputStream extends OutputStream {
private static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSBlockOutputStream.class);
private AliyunOSSFileSystemStore store;
private Configuration conf;
private boolean closed;
private String key;
private File blockFile;
private List<File> blockFiles = new ArrayList<>();
private long blockSize;
private int blockId = 0;
private long blockWritten = 0L;
private String uploadId = null;
private final List<ListenableFuture<PartETag>> partETagsFutures;
private final ListeningExecutorService executorService;
private OutputStream blockStream;
private final byte[] singleByte = new byte[1];
public AliyunOSSBlockOutputStream(Configuration conf,
AliyunOSSFileSystemStore store,
String key,
Long blockSize,
ExecutorService executorService) throws IOException {
this.store = store;
this.conf = conf;
this.key = key;
this.blockSize = blockSize;
this.blockFile = newBlockFile();
this.blockStream =
new BufferedOutputStream(new FileOutputStream(blockFile));
this.partETagsFutures = new ArrayList<>(2);
this.executorService = MoreExecutors.listeningDecorator(executorService);
}
private File newBlockFile() throws IOException {
return AliyunOSSUtils.createTmpFileForWrite(
String.format("oss-block-%04d-", blockId), blockSize, conf);
}
@Override
public synchronized void flush() throws IOException {
blockStream.flush();
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
blockStream.flush();
blockStream.close();
if (!blockFiles.contains(blockFile)) {
blockFiles.add(blockFile);
}
try {
if (blockFiles.size() == 1) {
// just upload it directly
store.uploadObject(key, blockFile);
} else {
if (blockWritten > 0) {
ListenableFuture<PartETag> partETagFuture =
executorService.submit(() -> {
PartETag partETag = store.uploadPart(blockFile, key, uploadId,
blockId + 1);
return partETag;
});
partETagsFutures.add(partETagFuture);
}
// wait for the partial uploads to finish
final List<PartETag> partETags = waitForAllPartUploads();
if (null == partETags) {
throw new IOException("Failed to multipart upload to oss, abort it.");
}
store.completeMultipartUpload(key, uploadId, partETags);
}
} finally {
for (File tFile: blockFiles) {
if (tFile.exists() && !tFile.delete()) {
LOG.warn("Failed to delete temporary file {}", tFile);
}
}
closed = true;
}
}
@Override
public void write(int b) throws IOException {
singleByte[0] = (byte)b;
write(singleByte, 0, 1);
}
@Override
public synchronized void write(byte[] b, int off, int len)
throws IOException {
if (closed) {
throw new IOException("Stream closed.");
}
try {
blockStream.write(b, off, len);
blockWritten += len;
if (blockWritten >= blockSize) {
uploadCurrentPart();
blockWritten = 0L;
}
} finally {
for (File tFile: blockFiles) {
if (tFile.exists() && !tFile.delete()) {
LOG.warn("Failed to delete temporary file {}", tFile);
}
}
}
}
private void uploadCurrentPart() throws IOException {
blockFiles.add(blockFile);
blockStream.flush();
blockStream.close();
if (blockId == 0) {
uploadId = store.getUploadId(key);
}
ListenableFuture<PartETag> partETagFuture =
executorService.submit(() -> {
PartETag partETag = store.uploadPart(blockFile, key, uploadId,
blockId + 1);
return partETag;
});
partETagsFutures.add(partETagFuture);
blockFile = newBlockFile();
blockId++;
blockStream = new BufferedOutputStream(new FileOutputStream(blockFile));
}
/**
* Block awaiting all outstanding uploads to complete.
* @return list of results
* @throws IOException IO Problems
*/
private List<PartETag> waitForAllPartUploads() throws IOException {
LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
try {
return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) {
LOG.warn("Interrupted partUpload", ie);
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException ee) {
//there is no way of recovering so abort
//cancel all partUploads
LOG.debug("While waiting for upload completion", ee);
LOG.debug("Cancelling futures");
for (ListenableFuture<PartETag> future : partETagsFutures) {
future.cancel(true);
}
//abort multipartupload
store.abortMultipartUpload(key, uploadId);
throw new IOException("Multi-part upload with id '" + uploadId
+ "' to " + key, ee);
}
}
}

View File

@ -56,6 +56,8 @@ import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.intOption;
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.longOption;
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory; import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*; import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
@ -69,6 +71,7 @@ public class AliyunOSSFileSystem extends FileSystem {
private URI uri; private URI uri;
private String bucket; private String bucket;
private Path workingDir; private Path workingDir;
private int blockOutputActiveBlocks;
private AliyunOSSFileSystemStore store; private AliyunOSSFileSystemStore store;
private int maxKeys; private int maxKeys;
private int maxReadAheadPartNumber; private int maxReadAheadPartNumber;
@ -125,8 +128,15 @@ public class AliyunOSSFileSystem extends FileSystem {
// this means the file is not found // this means the file is not found
} }
return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(), long uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(getConf(),
store, key, progress, statistics), (Statistics)(null)); MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
return new FSDataOutputStream(
new AliyunOSSBlockOutputStream(getConf(),
store,
key,
uploadPartSize,
new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true)), (Statistics)(null));
} }
/** /**
@ -149,9 +159,8 @@ public class AliyunOSSFileSystem extends FileSystem {
throw new FileAlreadyExistsException("Not a directory: " + parent); throw new FileAlreadyExistsException("Not a directory: " + parent);
} }
} }
return create(path, permission, return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
flags.contains(CreateFlag.OVERWRITE), bufferSize, bufferSize, replication, blockSize, progress);
replication, blockSize, progress);
} }
@Override @Override
@ -270,7 +279,7 @@ public class AliyunOSSFileSystem extends FileSystem {
} }
} else if (objectRepresentsDirectory(key, meta.getContentLength())) { } else if (objectRepresentsDirectory(key, meta.getContentLength())) {
return new FileStatus(0, true, 1, 0, meta.getLastModified().getTime(), return new FileStatus(0, true, 1, 0, meta.getLastModified().getTime(),
qualifiedPath); qualifiedPath);
} else { } else {
return new FileStatus(meta.getContentLength(), false, 1, return new FileStatus(meta.getContentLength(), false, 1,
getDefaultBlockSize(path), meta.getLastModified().getTime(), getDefaultBlockSize(path), meta.getLastModified().getTime(),
@ -318,6 +327,10 @@ public class AliyunOSSFileSystem extends FileSystem {
uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority()); uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
workingDir = new Path("/user", workingDir = new Path("/user",
System.getProperty("user.name")).makeQualified(uri, null); System.getProperty("user.name")).makeQualified(uri, null);
long keepAliveTime = longOption(conf,
KEEPALIVE_TIME_KEY, KEEPALIVE_TIME_DEFAULT, 0);
blockOutputActiveBlocks = intOption(conf,
UPLOAD_ACTIVE_BLOCKS_KEY, UPLOAD_ACTIVE_BLOCKS_DEFAULT, 1);
store = new AliyunOSSFileSystemStore(); store = new AliyunOSSFileSystemStore();
store.initialize(name, conf, statistics); store.initialize(name, conf, statistics);
@ -335,7 +348,8 @@ public class AliyunOSSFileSystem extends FileSystem {
Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT); Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared"); threadNum, totalTasks, keepAliveTime, TimeUnit.SECONDS,
"oss-transfer-shared");
maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf, maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf,
Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY, Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY,
@ -490,12 +504,12 @@ public class AliyunOSSFileSystem extends FileSystem {
if (status.isFile()) { if (status.isFile()) {
LOG.debug("{} is a File", qualifiedPath); LOG.debug("{} is a File", qualifiedPath);
final BlockLocation[] locations = getFileBlockLocations(status, final BlockLocation[] locations = getFileBlockLocations(status,
0, status.getLen()); 0, status.getLen());
return store.singleStatusRemoteIterator(filter.accept(f) ? status : null, return store.singleStatusRemoteIterator(filter.accept(f) ? status : null,
locations); locations);
} else { } else {
return store.createLocatedFileStatusIterator(key, maxKeys, this, filter, return store.createLocatedFileStatusIterator(key, maxKeys, this, filter,
acceptor, recursive ? null : "/"); acceptor, recursive ? null : "/");
} }
} }

View File

@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.fs.aliyun.oss; package org.apache.hadoop.fs.aliyun.oss;
import com.aliyun.oss.ClientConfiguration; import com.aliyun.oss.ClientConfiguration;
@ -62,8 +63,11 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.Serializable;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
@ -83,7 +87,6 @@ public class AliyunOSSFileSystemStore {
private String bucketName; private String bucketName;
private long uploadPartSize; private long uploadPartSize;
private long multipartThreshold; private long multipartThreshold;
private long partSize;
private int maxKeys; private int maxKeys;
private String serverSideEncryptionAlgorithm; private String serverSideEncryptionAlgorithm;
@ -143,28 +146,18 @@ public class AliyunOSSFileSystemStore {
String endPoint = conf.getTrimmed(ENDPOINT_KEY, ""); String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
if (StringUtils.isEmpty(endPoint)) { if (StringUtils.isEmpty(endPoint)) {
throw new IllegalArgumentException("Aliyun OSS endpoint should not be " + throw new IllegalArgumentException("Aliyun OSS endpoint should not be " +
"null or empty. Please set proper endpoint with 'fs.oss.endpoint'."); "null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
} }
CredentialsProvider provider = CredentialsProvider provider =
AliyunOSSUtils.getCredentialsProvider(conf); AliyunOSSUtils.getCredentialsProvider(conf);
ossClient = new OSSClient(endPoint, provider, clientConf); ossClient = new OSSClient(endPoint, provider, clientConf);
uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
MULTIPART_UPLOAD_SIZE_DEFAULT); MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
MULTIPART_UPLOAD_SIZE_DEFAULT);
if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
}
serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm =
conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, ""); conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
if (uploadPartSize < 5 * 1024 * 1024) {
LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
uploadPartSize = 5 * 1024 * 1024;
}
if (multipartThreshold < 5 * 1024 * 1024) { if (multipartThreshold < 5 * 1024 * 1024) {
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB"); LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
multipartThreshold = 5 * 1024 * 1024; multipartThreshold = 5 * 1024 * 1024;
@ -419,71 +412,6 @@ public class AliyunOSSFileSystemStore {
} }
} }
/**
* Upload a file as an OSS object, using multipart upload.
*
* @param key object key.
* @param file local file to upload.
* @throws IOException if failed to upload object.
*/
public void multipartUploadObject(String key, File file) throws IOException {
File object = file.getAbsoluteFile();
long dataLen = object.length();
long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
int partNum = (int) (dataLen / realPartSize);
if (dataLen % realPartSize != 0) {
partNum += 1;
}
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, key);
ObjectMetadata meta = new ObjectMetadata();
if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
}
initiateMultipartUploadRequest.setObjectMetadata(meta);
InitiateMultipartUploadResult initiateMultipartUploadResult =
ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
List<PartETag> partETags = new ArrayList<PartETag>();
String uploadId = initiateMultipartUploadResult.getUploadId();
try {
for (int i = 0; i < partNum; i++) {
// TODO: Optimize this, avoid opening the object multiple times
FileInputStream fis = new FileInputStream(object);
try {
long skipBytes = realPartSize * i;
AliyunOSSUtils.skipFully(fis, skipBytes);
long size = (realPartSize < dataLen - skipBytes) ?
realPartSize : dataLen - skipBytes;
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucketName);
uploadPartRequest.setKey(key);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(fis);
uploadPartRequest.setPartSize(size);
uploadPartRequest.setPartNumber(i + 1);
UploadPartResult uploadPartResult =
ossClient.uploadPart(uploadPartRequest);
statistics.incrementWriteOps(1);
partETags.add(uploadPartResult.getPartETag());
} finally {
fis.close();
}
}
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, key,
uploadId, partETags);
CompleteMultipartUploadResult completeMultipartUploadResult =
ossClient.completeMultipartUpload(completeMultipartUploadRequest);
LOG.debug(completeMultipartUploadResult.getETag());
} catch (OSSException | ClientException e) {
AbortMultipartUploadRequest abortMultipartUploadRequest =
new AbortMultipartUploadRequest(bucketName, key, uploadId);
ossClient.abortMultipartUpload(abortMultipartUploadRequest);
}
}
/** /**
* list objects. * list objects.
* *
@ -494,7 +422,7 @@ public class AliyunOSSFileSystemStore {
* @return a list of matches. * @return a list of matches.
*/ */
public ObjectListing listObjects(String prefix, int maxListingLength, public ObjectListing listObjects(String prefix, int maxListingLength,
String marker, boolean recursive) { String marker, boolean recursive) {
String delimiter = recursive ? null : "/"; String delimiter = recursive ? null : "/";
prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix); prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
@ -605,7 +533,7 @@ public class AliyunOSSFileSystemStore {
if (hasNext()) { if (hasNext()) {
FileStatus status = batchIterator.next(); FileStatus status = batchIterator.next();
BlockLocation[] locations = fs.getFileBlockLocations(status, BlockLocation[] locations = fs.getFileBlockLocations(status,
0, status.getLen()); 0, status.getLen());
return new LocatedFileStatus( return new LocatedFileStatus(
status, status.isFile() ? locations : null); status, status.isFile() ? locations : null);
} else { } else {
@ -626,7 +554,7 @@ public class AliyunOSSFileSystemStore {
List<FileStatus> stats = new ArrayList<>( List<FileStatus> stats = new ArrayList<>(
listing.getObjectSummaries().size() + listing.getObjectSummaries().size() +
listing.getCommonPrefixes().size()); listing.getCommonPrefixes().size());
for(OSSObjectSummary summary: listing.getObjectSummaries()) { for (OSSObjectSummary summary : listing.getObjectSummaries()) {
String key = summary.getKey(); String key = summary.getKey();
Path path = fs.makeQualified(new Path("/" + key)); Path path = fs.makeQualified(new Path("/" + key));
if (filter.accept(path) && acceptor.accept(path, summary)) { if (filter.accept(path) && acceptor.accept(path, summary)) {
@ -637,7 +565,7 @@ public class AliyunOSSFileSystemStore {
} }
} }
for(String commonPrefix: listing.getCommonPrefixes()) { for (String commonPrefix : listing.getCommonPrefixes()) {
Path path = fs.makeQualified(new Path("/" + commonPrefix)); Path path = fs.makeQualified(new Path("/" + commonPrefix));
if (filter.accept(path) && acceptor.accept(path, commonPrefix)) { if (filter.accept(path) && acceptor.accept(path, commonPrefix)) {
FileStatus status = new FileStatus(0, true, 1, 0, 0, path); FileStatus status = new FileStatus(0, true, 1, 0, 0, path);
@ -656,4 +584,83 @@ public class AliyunOSSFileSystemStore {
} }
}; };
} }
public PartETag uploadPart(File file, String key, String uploadId, int idx)
throws IOException {
InputStream instream = null;
Exception caught = null;
int tries = 3;
while (tries > 0) {
try {
instream = new FileInputStream(file);
UploadPartRequest uploadRequest = new UploadPartRequest();
uploadRequest.setBucketName(bucketName);
uploadRequest.setKey(key);
uploadRequest.setUploadId(uploadId);
uploadRequest.setInputStream(instream);
uploadRequest.setPartSize(file.length());
uploadRequest.setPartNumber(idx);
UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
return uploadResult.getPartETag();
} catch (Exception e) {
LOG.debug("Failed to upload "+ file.getPath() +", " +
"try again.", e);
caught = e;
} finally {
if (instream != null) {
instream.close();
instream = null;
}
}
tries--;
}
assert (caught != null);
throw new IOException("Failed to upload " + file.getPath() +
" for 3 times.", caught);
}
/**
* Initiate multipart upload.
*/
public String getUploadId(String key) {
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, key);
InitiateMultipartUploadResult initiateMultipartUploadResult =
ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
return initiateMultipartUploadResult.getUploadId();
}
/**
* Complete the specific multipart upload.
*/
public CompleteMultipartUploadResult completeMultipartUpload(String key,
String uploadId, List<PartETag> partETags) {
Collections.sort(partETags, new PartNumberAscendComparator());
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, key, uploadId,
partETags);
return ossClient.completeMultipartUpload(completeMultipartUploadRequest);
}
/**
* Abort the specific multipart upload.
*/
public void abortMultipartUpload(String key, String uploadId) {
AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(
bucketName, key, uploadId);
ossClient.abortMultipartUpload(request);
}
private static class PartNumberAscendComparator
implements Comparator<PartETag>, Serializable {
@Override
public int compare(PartETag o1, PartETag o2) {
if (o1.getPartNumber() > o2.getPartNumber()) {
return 1;
} else {
return -1;
}
}
}
} }

View File

@ -1,111 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
/**
* The output stream for OSS blob system.
* Data will be buffered on local disk, then uploaded to OSS in
* {@link #close()} method.
*/
public class AliyunOSSOutputStream extends OutputStream {
public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class);
private AliyunOSSFileSystemStore store;
private final String key;
private Statistics statistics;
private Progressable progress;
private long partSizeThreshold;
private LocalDirAllocator dirAlloc;
private boolean closed;
private File tmpFile;
private BufferedOutputStream backupStream;
public AliyunOSSOutputStream(Configuration conf,
AliyunOSSFileSystemStore store, String key, Progressable progress,
Statistics statistics) throws IOException {
this.store = store;
this.key = key;
// The caller cann't get any progress information
this.progress = progress;
this.statistics = statistics;
partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
if (conf.get(BUFFER_DIR_KEY) == null) {
conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
}
dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
tmpFile = dirAlloc.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
closed = false;
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
if (backupStream != null) {
backupStream.close();
}
long dataLen = tmpFile.length();
try {
if (dataLen <= partSizeThreshold) {
store.uploadObject(key, tmpFile);
} else {
store.multipartUploadObject(key, tmpFile);
}
} finally {
if (!tmpFile.delete()) {
LOG.warn("Can not delete file: " + tmpFile);
}
}
}
@Override
public synchronized void flush() throws IOException {
backupStream.flush();
}
@Override
public synchronized void write(int b) throws IOException {
backupStream.write(b);
statistics.incrementBytesWritten(1);
}
}

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.fs.aliyun.oss; package org.apache.hadoop.fs.aliyun.oss;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import com.aliyun.oss.common.auth.CredentialsProvider; import com.aliyun.oss.common.auth.CredentialsProvider;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.security.ProviderUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -36,6 +38,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
final public class AliyunOSSUtils { final public class AliyunOSSUtils {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(AliyunOSSUtils.class); LoggerFactory.getLogger(AliyunOSSUtils.class);
private static LocalDirAllocator directoryAllocator;
private AliyunOSSUtils() { private AliyunOSSUtils() {
} }
@ -74,31 +77,6 @@ final public class AliyunOSSUtils {
} }
} }
/**
* Skip the requested number of bytes or fail if there are no enough bytes
* left. This allows for the possibility that {@link InputStream#skip(long)}
* may not skip as many bytes as requested (most likely because of reaching
* EOF).
*
* @param is the input stream to skip.
* @param n the number of bytes to skip.
* @throws IOException thrown when skipped less number of bytes.
*/
public static void skipFully(InputStream is, long n) throws IOException {
long total = 0;
long cur = 0;
do {
cur = is.skip(n - total);
total += cur;
} while((total < n) && (cur > 0));
if (total < n) {
throw new IOException("Failed to skip " + n + " bytes, possibly due " +
"to EOF.");
}
}
/** /**
* Calculate a proper size of multipart piece. If <code>minPartSize</code> * Calculate a proper size of multipart piece. If <code>minPartSize</code>
* is too small, the number of multipart pieces may exceed the limit of * is too small, the number of multipart pieces may exceed the limit of
@ -126,7 +104,7 @@ final public class AliyunOSSUtils {
throws IOException { throws IOException {
CredentialsProvider credentials; CredentialsProvider credentials;
String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY); String className = conf.getTrimmed(CREDENTIALS_PROVIDER_KEY);
if (StringUtils.isEmpty(className)) { if (StringUtils.isEmpty(className)) {
Configuration newConf = Configuration newConf =
ProviderUtils.excludeIncompatibleCredentialProviders(conf, ProviderUtils.excludeIncompatibleCredentialProviders(conf,
@ -151,7 +129,7 @@ final public class AliyunOSSUtils {
throw new IOException(String.format("%s constructor exception. A " + throw new IOException(String.format("%s constructor exception. A " +
"class specified in %s must provide an accessible constructor " + "class specified in %s must provide an accessible constructor " +
"accepting URI and Configuration, or an accessible default " + "accepting URI and Configuration, or an accessible default " +
"constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), "constructor.", className, CREDENTIALS_PROVIDER_KEY),
e); e);
} catch (ReflectiveOperationException | IllegalArgumentException e) { } catch (ReflectiveOperationException | IllegalArgumentException e) {
throw new IOException(className + " instantiation exception.", e); throw new IOException(className + " instantiation exception.", e);
@ -188,4 +166,85 @@ final public class AliyunOSSUtils {
final long size) { final long size) {
return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L; return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
} }
/**
* Demand create the directory allocator, then create a temporary file.
* @param path prefix for the temporary file
* @param size the size of the file that is going to be written
* @param conf the Configuration object
* @return a unique temporary file
* @throws IOException IO problems
*/
public static File createTmpFileForWrite(String path, long size,
Configuration conf) throws IOException {
if (conf.get(BUFFER_DIR_KEY) == null) {
conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
}
if (directoryAllocator == null) {
directoryAllocator = new LocalDirAllocator(BUFFER_DIR_KEY);
}
return directoryAllocator.createTmpFileForWrite(path, size, conf);
}
/**
* Get a integer option >= the minimum allowed value.
* @param conf configuration
* @param key key to look up
* @param defVal default value
* @param min minimum value
* @return the value
* @throws IllegalArgumentException if the value is below the minimum
*/
static int intOption(Configuration conf, String key, int defVal, int min) {
int v = conf.getInt(key, defVal);
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
LOG.debug("Value of {} is {}", key, v);
return v;
}
/**
* Get a long option >= the minimum allowed value.
* @param conf configuration
* @param key key to look up
* @param defVal default value
* @param min minimum value
* @return the value
* @throws IllegalArgumentException if the value is below the minimum
*/
static long longOption(Configuration conf, String key, long defVal,
long min) {
long v = conf.getLong(key, defVal);
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
LOG.debug("Value of {} is {}", key, v);
return v;
}
/**
* Get a size property from the configuration: this property must
* be at least equal to {@link Constants#MULTIPART_MIN_SIZE}.
* If it is too small, it is rounded up to that minimum, and a warning
* printed.
* @param conf configuration
* @param property property name
* @param defVal default value
* @return the value, guaranteed to be above the minimum size
*/
public static long getMultipartSizeProperty(Configuration conf,
String property, long defVal) {
long partSize = conf.getLong(property, defVal);
if (partSize < MULTIPART_MIN_SIZE) {
LOG.warn("{} must be at least 100 KB; configured value is {}",
property, partSize);
partSize = MULTIPART_MIN_SIZE;
} else if (partSize > Integer.MAX_VALUE) {
LOG.warn("oss: {} capped to ~2.14GB(maximum allowed size with " +
"current output mechanism)", MULTIPART_UPLOAD_PART_SIZE_KEY);
partSize = Integer.MAX_VALUE;
}
return partSize;
}
} }

View File

@ -31,10 +31,10 @@ public final class Constants {
// User agent // User agent
public static final String USER_AGENT_PREFIX = "fs.oss.user.agent.prefix"; public static final String USER_AGENT_PREFIX = "fs.oss.user.agent.prefix";
public static final String USER_AGENT_PREFIX_DEFAULT = public static final String USER_AGENT_PREFIX_DEFAULT =
VersionInfoUtils.getDefaultUserAgent(); VersionInfoUtils.getDefaultUserAgent();
// Class of credential provider // Class of credential provider
public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY = public static final String CREDENTIALS_PROVIDER_KEY =
"fs.oss.credentials.provider"; "fs.oss.credentials.provider";
// OSS access verification // OSS access verification
@ -82,10 +82,14 @@ public final class Constants {
public static final int MAX_PAGING_KEYS_DEFAULT = 1000; public static final int MAX_PAGING_KEYS_DEFAULT = 1000;
// Size of each of or multipart pieces in bytes // Size of each of or multipart pieces in bytes
public static final String MULTIPART_UPLOAD_SIZE_KEY = public static final String MULTIPART_UPLOAD_PART_SIZE_KEY =
"fs.oss.multipart.upload.size"; "fs.oss.multipart.upload.size";
public static final long MULTIPART_UPLOAD_PART_SIZE_DEFAULT =
104857600; // 100 MB
/** The minimum multipart size which Aliyun OSS supports. */
public static final int MULTIPART_MIN_SIZE = 100 * 1024;
public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024;
public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000; public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000;
// Minimum size in bytes before we start a multipart uploads or copy // Minimum size in bytes before we start a multipart uploads or copy
@ -96,7 +100,6 @@ public final class Constants {
public static final String MULTIPART_DOWNLOAD_SIZE_KEY = public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
"fs.oss.multipart.download.size"; "fs.oss.multipart.download.size";
public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024; public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024;
public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY = public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY =
@ -139,9 +142,14 @@ public final class Constants {
public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size"; public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size";
public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024; public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024;
public static final String FS_OSS = "oss"; public static final String FS_OSS = "oss";
public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L; public static final String KEEPALIVE_TIME_KEY =
public static final int MAX_RETRIES = 10; "fs.oss.threads.keepalivetime";
public static final int KEEPALIVE_TIME_DEFAULT = 60;
public static final String UPLOAD_ACTIVE_BLOCKS_KEY =
"fs.oss.upload.active.blocks";
public static final int UPLOAD_ACTIVE_BLOCKS_DEFAULT = 4;
} }

View File

@ -30,10 +30,13 @@ import org.junit.rules.Timeout;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
/** /**
* Tests regular and multi-part upload functionality for AliyunOSSOutputStream. * Tests regular and multi-part upload functionality for
* AliyunOSSBlockOutputStream.
*/ */
public class TestAliyunOSSOutputStream { public class TestAliyunOSSBlockOutputStream {
private FileSystem fs; private FileSystem fs;
private static String testRootPath = private static String testRootPath =
AliyunOSSTestUtils.generateUniqueTestPath(); AliyunOSSTestUtils.generateUniqueTestPath();
@ -45,7 +48,7 @@ public class TestAliyunOSSOutputStream {
public void setUp() throws Exception { public void setUp() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024); conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024); conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 5 * 1024 * 1024);
fs = AliyunOSSTestUtils.createTestFileSystem(conf); fs = AliyunOSSTestUtils.createTestFileSystem(conf);
} }
@ -56,18 +59,39 @@ public class TestAliyunOSSOutputStream {
} }
} }
protected Path getTestPath() { private Path getTestPath() {
return new Path(testRootPath + "/test-aliyun-oss"); return new Path(testRootPath + "/test-aliyun-oss");
} }
@Test
public void testZeroByteUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0);
}
@Test @Test
public void testRegularUpload() throws IOException { public void testRegularUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 - 1);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 + 1);
} }
@Test @Test
public void testMultiPartUpload() throws IOException { public void testMultiPartUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
6 * 1024 * 1024 - 1);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
6 * 1024 * 1024 + 1);
}
@Test
public void testHugeUpload() throws IOException {
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
MULTIPART_UPLOAD_PART_SIZE_DEFAULT - 1);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
} }
@Test @Test

View File

@ -123,15 +123,15 @@ public class TestAliyunOSSInputStream {
+ fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0); + fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0);
assertTrue("expected position at:" assertTrue("expected position at:"
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:" + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+ in.getExpectNextPos(), + in.getExpectNextPos(),
in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT); in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
fsDataInputStream.seek(4 * 1024 * 1024); fsDataInputStream.seek(4 * 1024 * 1024);
assertTrue("expected position at:" + 4 * 1024 * 1024 assertTrue("expected position at:" + 4 * 1024 * 1024
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:" + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:"
+ in.getExpectNextPos(), + in.getExpectNextPos(),
in.getExpectNextPos() == 4 * 1024 * 1024 in.getExpectNextPos() == 4 * 1024 * 1024
+ Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT); + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT);
IOUtils.closeStream(fsDataInputStream); IOUtils.closeStream(fsDataInputStream);
} }

View File

@ -33,7 +33,7 @@ public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest {
protected Configuration createConfiguration() { protected Configuration createConfiguration() {
Configuration newConf = super.createConfiguration(); Configuration newConf = super.createConfiguration();
newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING); newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING); newConf.setLong(MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_SETTING);
return newConf; return newConf;
} }