HADOOP-14999. AliyunOSS: provide one asynchronous multi-part based uploading mechanism. Contributed by Genmao Yu.
This commit is contained in:
parent
2216bde322
commit
6542d17ea4
|
@ -35,8 +35,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
|||
public class AliyunCredentialsProvider implements CredentialsProvider {
|
||||
private Credentials credentials = null;
|
||||
|
||||
public AliyunCredentialsProvider(Configuration conf)
|
||||
throws IOException {
|
||||
public AliyunCredentialsProvider(Configuration conf) throws IOException {
|
||||
String accessKeyId;
|
||||
String accessKeySecret;
|
||||
String securityToken;
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.aliyun.oss;
|
||||
|
||||
import com.aliyun.oss.model.PartETag;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
* Asynchronous multi-part based uploading mechanism to support huge file
|
||||
* which is larger than 5GB. Data will be buffered on local disk, then uploaded
|
||||
* to OSS in {@link #close()} method.
|
||||
*/
|
||||
public class AliyunOSSBlockOutputStream extends OutputStream {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AliyunOSSBlockOutputStream.class);
|
||||
private AliyunOSSFileSystemStore store;
|
||||
private Configuration conf;
|
||||
private boolean closed;
|
||||
private String key;
|
||||
private File blockFile;
|
||||
private List<File> blockFiles = new ArrayList<>();
|
||||
private long blockSize;
|
||||
private int blockId = 0;
|
||||
private long blockWritten = 0L;
|
||||
private String uploadId = null;
|
||||
private final List<ListenableFuture<PartETag>> partETagsFutures;
|
||||
private final ListeningExecutorService executorService;
|
||||
private OutputStream blockStream;
|
||||
private final byte[] singleByte = new byte[1];
|
||||
|
||||
public AliyunOSSBlockOutputStream(Configuration conf,
|
||||
AliyunOSSFileSystemStore store,
|
||||
String key,
|
||||
Long blockSize,
|
||||
ExecutorService executorService) throws IOException {
|
||||
this.store = store;
|
||||
this.conf = conf;
|
||||
this.key = key;
|
||||
this.blockSize = blockSize;
|
||||
this.blockFile = newBlockFile();
|
||||
this.blockStream =
|
||||
new BufferedOutputStream(new FileOutputStream(blockFile));
|
||||
this.partETagsFutures = new ArrayList<>(2);
|
||||
this.executorService = MoreExecutors.listeningDecorator(executorService);
|
||||
}
|
||||
|
||||
private File newBlockFile() throws IOException {
|
||||
return AliyunOSSUtils.createTmpFileForWrite(
|
||||
String.format("oss-block-%04d-", blockId), blockSize, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void flush() throws IOException {
|
||||
blockStream.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
blockStream.flush();
|
||||
blockStream.close();
|
||||
if (!blockFiles.contains(blockFile)) {
|
||||
blockFiles.add(blockFile);
|
||||
}
|
||||
|
||||
try {
|
||||
if (blockFiles.size() == 1) {
|
||||
// just upload it directly
|
||||
store.uploadObject(key, blockFile);
|
||||
} else {
|
||||
if (blockWritten > 0) {
|
||||
ListenableFuture<PartETag> partETagFuture =
|
||||
executorService.submit(() -> {
|
||||
PartETag partETag = store.uploadPart(blockFile, key, uploadId,
|
||||
blockId + 1);
|
||||
return partETag;
|
||||
});
|
||||
partETagsFutures.add(partETagFuture);
|
||||
}
|
||||
// wait for the partial uploads to finish
|
||||
final List<PartETag> partETags = waitForAllPartUploads();
|
||||
if (null == partETags) {
|
||||
throw new IOException("Failed to multipart upload to oss, abort it.");
|
||||
}
|
||||
store.completeMultipartUpload(key, uploadId, partETags);
|
||||
}
|
||||
} finally {
|
||||
for (File tFile: blockFiles) {
|
||||
if (tFile.exists() && !tFile.delete()) {
|
||||
LOG.warn("Failed to delete temporary file {}", tFile);
|
||||
}
|
||||
}
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
singleByte[0] = (byte)b;
|
||||
write(singleByte, 0, 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void write(byte[] b, int off, int len)
|
||||
throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("Stream closed.");
|
||||
}
|
||||
try {
|
||||
blockStream.write(b, off, len);
|
||||
blockWritten += len;
|
||||
if (blockWritten >= blockSize) {
|
||||
uploadCurrentPart();
|
||||
blockWritten = 0L;
|
||||
}
|
||||
} finally {
|
||||
for (File tFile: blockFiles) {
|
||||
if (tFile.exists() && !tFile.delete()) {
|
||||
LOG.warn("Failed to delete temporary file {}", tFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void uploadCurrentPart() throws IOException {
|
||||
blockFiles.add(blockFile);
|
||||
blockStream.flush();
|
||||
blockStream.close();
|
||||
if (blockId == 0) {
|
||||
uploadId = store.getUploadId(key);
|
||||
}
|
||||
ListenableFuture<PartETag> partETagFuture =
|
||||
executorService.submit(() -> {
|
||||
PartETag partETag = store.uploadPart(blockFile, key, uploadId,
|
||||
blockId + 1);
|
||||
return partETag;
|
||||
});
|
||||
partETagsFutures.add(partETagFuture);
|
||||
blockFile = newBlockFile();
|
||||
blockId++;
|
||||
blockStream = new BufferedOutputStream(new FileOutputStream(blockFile));
|
||||
}
|
||||
|
||||
/**
|
||||
* Block awaiting all outstanding uploads to complete.
|
||||
* @return list of results
|
||||
* @throws IOException IO Problems
|
||||
*/
|
||||
private List<PartETag> waitForAllPartUploads() throws IOException {
|
||||
LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
|
||||
try {
|
||||
return Futures.allAsList(partETagsFutures).get();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Interrupted partUpload", ie);
|
||||
Thread.currentThread().interrupt();
|
||||
return null;
|
||||
} catch (ExecutionException ee) {
|
||||
//there is no way of recovering so abort
|
||||
//cancel all partUploads
|
||||
LOG.debug("While waiting for upload completion", ee);
|
||||
LOG.debug("Cancelling futures");
|
||||
for (ListenableFuture<PartETag> future : partETagsFutures) {
|
||||
future.cancel(true);
|
||||
}
|
||||
//abort multipartupload
|
||||
store.abortMultipartUpload(key, uploadId);
|
||||
throw new IOException("Multi-part upload with id '" + uploadId
|
||||
+ "' to " + key, ee);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -56,6 +56,8 @@ import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.intOption;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.longOption;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory;
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
||||
|
||||
|
@ -69,6 +71,7 @@ public class AliyunOSSFileSystem extends FileSystem {
|
|||
private URI uri;
|
||||
private String bucket;
|
||||
private Path workingDir;
|
||||
private int blockOutputActiveBlocks;
|
||||
private AliyunOSSFileSystemStore store;
|
||||
private int maxKeys;
|
||||
private int maxReadAheadPartNumber;
|
||||
|
@ -125,8 +128,15 @@ public class AliyunOSSFileSystem extends FileSystem {
|
|||
// this means the file is not found
|
||||
}
|
||||
|
||||
return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(),
|
||||
store, key, progress, statistics), (Statistics)(null));
|
||||
long uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(getConf(),
|
||||
MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
|
||||
return new FSDataOutputStream(
|
||||
new AliyunOSSBlockOutputStream(getConf(),
|
||||
store,
|
||||
key,
|
||||
uploadPartSize,
|
||||
new SemaphoredDelegatingExecutor(boundedThreadPool,
|
||||
blockOutputActiveBlocks, true)), (Statistics)(null));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -149,9 +159,8 @@ public class AliyunOSSFileSystem extends FileSystem {
|
|||
throw new FileAlreadyExistsException("Not a directory: " + parent);
|
||||
}
|
||||
}
|
||||
return create(path, permission,
|
||||
flags.contains(CreateFlag.OVERWRITE), bufferSize,
|
||||
replication, blockSize, progress);
|
||||
return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
|
||||
bufferSize, replication, blockSize, progress);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -318,6 +327,10 @@ public class AliyunOSSFileSystem extends FileSystem {
|
|||
uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
|
||||
workingDir = new Path("/user",
|
||||
System.getProperty("user.name")).makeQualified(uri, null);
|
||||
long keepAliveTime = longOption(conf,
|
||||
KEEPALIVE_TIME_KEY, KEEPALIVE_TIME_DEFAULT, 0);
|
||||
blockOutputActiveBlocks = intOption(conf,
|
||||
UPLOAD_ACTIVE_BLOCKS_KEY, UPLOAD_ACTIVE_BLOCKS_DEFAULT, 1);
|
||||
|
||||
store = new AliyunOSSFileSystemStore();
|
||||
store.initialize(name, conf, statistics);
|
||||
|
@ -335,7 +348,8 @@ public class AliyunOSSFileSystem extends FileSystem {
|
|||
Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT);
|
||||
|
||||
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
|
||||
threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared");
|
||||
threadNum, totalTasks, keepAliveTime, TimeUnit.SECONDS,
|
||||
"oss-transfer-shared");
|
||||
|
||||
maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf,
|
||||
Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY,
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.aliyun.oss;
|
||||
|
||||
import com.aliyun.oss.ClientConfiguration;
|
||||
|
@ -62,8 +63,11 @@ import java.io.File;
|
|||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.Serializable;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.NoSuchElementException;
|
||||
|
@ -83,7 +87,6 @@ public class AliyunOSSFileSystemStore {
|
|||
private String bucketName;
|
||||
private long uploadPartSize;
|
||||
private long multipartThreshold;
|
||||
private long partSize;
|
||||
private int maxKeys;
|
||||
private String serverSideEncryptionAlgorithm;
|
||||
|
||||
|
@ -148,23 +151,13 @@ public class AliyunOSSFileSystemStore {
|
|||
CredentialsProvider provider =
|
||||
AliyunOSSUtils.getCredentialsProvider(conf);
|
||||
ossClient = new OSSClient(endPoint, provider, clientConf);
|
||||
uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
|
||||
MULTIPART_UPLOAD_SIZE_DEFAULT);
|
||||
uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
|
||||
MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
|
||||
multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
|
||||
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
|
||||
partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
|
||||
MULTIPART_UPLOAD_SIZE_DEFAULT);
|
||||
if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
|
||||
partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
|
||||
}
|
||||
serverSideEncryptionAlgorithm =
|
||||
conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
|
||||
|
||||
if (uploadPartSize < 5 * 1024 * 1024) {
|
||||
LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
|
||||
uploadPartSize = 5 * 1024 * 1024;
|
||||
}
|
||||
|
||||
if (multipartThreshold < 5 * 1024 * 1024) {
|
||||
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
|
||||
multipartThreshold = 5 * 1024 * 1024;
|
||||
|
@ -419,71 +412,6 @@ public class AliyunOSSFileSystemStore {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload a file as an OSS object, using multipart upload.
|
||||
*
|
||||
* @param key object key.
|
||||
* @param file local file to upload.
|
||||
* @throws IOException if failed to upload object.
|
||||
*/
|
||||
public void multipartUploadObject(String key, File file) throws IOException {
|
||||
File object = file.getAbsoluteFile();
|
||||
long dataLen = object.length();
|
||||
long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
|
||||
int partNum = (int) (dataLen / realPartSize);
|
||||
if (dataLen % realPartSize != 0) {
|
||||
partNum += 1;
|
||||
}
|
||||
|
||||
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
|
||||
new InitiateMultipartUploadRequest(bucketName, key);
|
||||
ObjectMetadata meta = new ObjectMetadata();
|
||||
if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
|
||||
meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
||||
}
|
||||
initiateMultipartUploadRequest.setObjectMetadata(meta);
|
||||
InitiateMultipartUploadResult initiateMultipartUploadResult =
|
||||
ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
|
||||
List<PartETag> partETags = new ArrayList<PartETag>();
|
||||
String uploadId = initiateMultipartUploadResult.getUploadId();
|
||||
|
||||
try {
|
||||
for (int i = 0; i < partNum; i++) {
|
||||
// TODO: Optimize this, avoid opening the object multiple times
|
||||
FileInputStream fis = new FileInputStream(object);
|
||||
try {
|
||||
long skipBytes = realPartSize * i;
|
||||
AliyunOSSUtils.skipFully(fis, skipBytes);
|
||||
long size = (realPartSize < dataLen - skipBytes) ?
|
||||
realPartSize : dataLen - skipBytes;
|
||||
UploadPartRequest uploadPartRequest = new UploadPartRequest();
|
||||
uploadPartRequest.setBucketName(bucketName);
|
||||
uploadPartRequest.setKey(key);
|
||||
uploadPartRequest.setUploadId(uploadId);
|
||||
uploadPartRequest.setInputStream(fis);
|
||||
uploadPartRequest.setPartSize(size);
|
||||
uploadPartRequest.setPartNumber(i + 1);
|
||||
UploadPartResult uploadPartResult =
|
||||
ossClient.uploadPart(uploadPartRequest);
|
||||
statistics.incrementWriteOps(1);
|
||||
partETags.add(uploadPartResult.getPartETag());
|
||||
} finally {
|
||||
fis.close();
|
||||
}
|
||||
}
|
||||
CompleteMultipartUploadRequest completeMultipartUploadRequest =
|
||||
new CompleteMultipartUploadRequest(bucketName, key,
|
||||
uploadId, partETags);
|
||||
CompleteMultipartUploadResult completeMultipartUploadResult =
|
||||
ossClient.completeMultipartUpload(completeMultipartUploadRequest);
|
||||
LOG.debug(completeMultipartUploadResult.getETag());
|
||||
} catch (OSSException | ClientException e) {
|
||||
AbortMultipartUploadRequest abortMultipartUploadRequest =
|
||||
new AbortMultipartUploadRequest(bucketName, key, uploadId);
|
||||
ossClient.abortMultipartUpload(abortMultipartUploadRequest);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* list objects.
|
||||
*
|
||||
|
@ -656,4 +584,83 @@ public class AliyunOSSFileSystemStore {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
public PartETag uploadPart(File file, String key, String uploadId, int idx)
|
||||
throws IOException {
|
||||
InputStream instream = null;
|
||||
Exception caught = null;
|
||||
int tries = 3;
|
||||
while (tries > 0) {
|
||||
try {
|
||||
instream = new FileInputStream(file);
|
||||
UploadPartRequest uploadRequest = new UploadPartRequest();
|
||||
uploadRequest.setBucketName(bucketName);
|
||||
uploadRequest.setKey(key);
|
||||
uploadRequest.setUploadId(uploadId);
|
||||
uploadRequest.setInputStream(instream);
|
||||
uploadRequest.setPartSize(file.length());
|
||||
uploadRequest.setPartNumber(idx);
|
||||
UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
|
||||
return uploadResult.getPartETag();
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Failed to upload "+ file.getPath() +", " +
|
||||
"try again.", e);
|
||||
caught = e;
|
||||
} finally {
|
||||
if (instream != null) {
|
||||
instream.close();
|
||||
instream = null;
|
||||
}
|
||||
}
|
||||
tries--;
|
||||
}
|
||||
|
||||
assert (caught != null);
|
||||
throw new IOException("Failed to upload " + file.getPath() +
|
||||
" for 3 times.", caught);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiate multipart upload.
|
||||
*/
|
||||
public String getUploadId(String key) {
|
||||
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
|
||||
new InitiateMultipartUploadRequest(bucketName, key);
|
||||
InitiateMultipartUploadResult initiateMultipartUploadResult =
|
||||
ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
|
||||
return initiateMultipartUploadResult.getUploadId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete the specific multipart upload.
|
||||
*/
|
||||
public CompleteMultipartUploadResult completeMultipartUpload(String key,
|
||||
String uploadId, List<PartETag> partETags) {
|
||||
Collections.sort(partETags, new PartNumberAscendComparator());
|
||||
CompleteMultipartUploadRequest completeMultipartUploadRequest =
|
||||
new CompleteMultipartUploadRequest(bucketName, key, uploadId,
|
||||
partETags);
|
||||
return ossClient.completeMultipartUpload(completeMultipartUploadRequest);
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort the specific multipart upload.
|
||||
*/
|
||||
public void abortMultipartUpload(String key, String uploadId) {
|
||||
AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(
|
||||
bucketName, key, uploadId);
|
||||
ossClient.abortMultipartUpload(request);
|
||||
}
|
||||
|
||||
private static class PartNumberAscendComparator
|
||||
implements Comparator<PartETag>, Serializable {
|
||||
@Override
|
||||
public int compare(PartETag o1, PartETag o2) {
|
||||
if (o1.getPartNumber() > o2.getPartNumber()) {
|
||||
return 1;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,111 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.aliyun.oss;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
||||
|
||||
/**
|
||||
* The output stream for OSS blob system.
|
||||
* Data will be buffered on local disk, then uploaded to OSS in
|
||||
* {@link #close()} method.
|
||||
*/
|
||||
public class AliyunOSSOutputStream extends OutputStream {
|
||||
public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class);
|
||||
private AliyunOSSFileSystemStore store;
|
||||
private final String key;
|
||||
private Statistics statistics;
|
||||
private Progressable progress;
|
||||
private long partSizeThreshold;
|
||||
private LocalDirAllocator dirAlloc;
|
||||
private boolean closed;
|
||||
private File tmpFile;
|
||||
private BufferedOutputStream backupStream;
|
||||
|
||||
public AliyunOSSOutputStream(Configuration conf,
|
||||
AliyunOSSFileSystemStore store, String key, Progressable progress,
|
||||
Statistics statistics) throws IOException {
|
||||
this.store = store;
|
||||
this.key = key;
|
||||
// The caller cann't get any progress information
|
||||
this.progress = progress;
|
||||
this.statistics = statistics;
|
||||
partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
|
||||
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
|
||||
|
||||
if (conf.get(BUFFER_DIR_KEY) == null) {
|
||||
conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
|
||||
}
|
||||
dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
|
||||
|
||||
tmpFile = dirAlloc.createTmpFileForWrite("output-",
|
||||
LocalDirAllocator.SIZE_UNKNOWN, conf);
|
||||
backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
|
||||
closed = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
closed = true;
|
||||
if (backupStream != null) {
|
||||
backupStream.close();
|
||||
}
|
||||
long dataLen = tmpFile.length();
|
||||
try {
|
||||
if (dataLen <= partSizeThreshold) {
|
||||
store.uploadObject(key, tmpFile);
|
||||
} else {
|
||||
store.multipartUploadObject(key, tmpFile);
|
||||
}
|
||||
} finally {
|
||||
if (!tmpFile.delete()) {
|
||||
LOG.warn("Can not delete file: " + tmpFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized void flush() throws IOException {
|
||||
backupStream.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void write(int b) throws IOException {
|
||||
backupStream.write(b);
|
||||
statistics.incrementBytesWritten(1);
|
||||
}
|
||||
|
||||
}
|
|
@ -18,12 +18,14 @@
|
|||
|
||||
package org.apache.hadoop.fs.aliyun.oss;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import com.aliyun.oss.common.auth.CredentialsProvider;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.security.ProviderUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -36,6 +38,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
|||
final public class AliyunOSSUtils {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AliyunOSSUtils.class);
|
||||
private static LocalDirAllocator directoryAllocator;
|
||||
|
||||
private AliyunOSSUtils() {
|
||||
}
|
||||
|
@ -74,31 +77,6 @@ final public class AliyunOSSUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip the requested number of bytes or fail if there are no enough bytes
|
||||
* left. This allows for the possibility that {@link InputStream#skip(long)}
|
||||
* may not skip as many bytes as requested (most likely because of reaching
|
||||
* EOF).
|
||||
*
|
||||
* @param is the input stream to skip.
|
||||
* @param n the number of bytes to skip.
|
||||
* @throws IOException thrown when skipped less number of bytes.
|
||||
*/
|
||||
public static void skipFully(InputStream is, long n) throws IOException {
|
||||
long total = 0;
|
||||
long cur = 0;
|
||||
|
||||
do {
|
||||
cur = is.skip(n - total);
|
||||
total += cur;
|
||||
} while((total < n) && (cur > 0));
|
||||
|
||||
if (total < n) {
|
||||
throw new IOException("Failed to skip " + n + " bytes, possibly due " +
|
||||
"to EOF.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate a proper size of multipart piece. If <code>minPartSize</code>
|
||||
* is too small, the number of multipart pieces may exceed the limit of
|
||||
|
@ -126,7 +104,7 @@ final public class AliyunOSSUtils {
|
|||
throws IOException {
|
||||
CredentialsProvider credentials;
|
||||
|
||||
String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY);
|
||||
String className = conf.getTrimmed(CREDENTIALS_PROVIDER_KEY);
|
||||
if (StringUtils.isEmpty(className)) {
|
||||
Configuration newConf =
|
||||
ProviderUtils.excludeIncompatibleCredentialProviders(conf,
|
||||
|
@ -151,7 +129,7 @@ final public class AliyunOSSUtils {
|
|||
throw new IOException(String.format("%s constructor exception. A " +
|
||||
"class specified in %s must provide an accessible constructor " +
|
||||
"accepting URI and Configuration, or an accessible default " +
|
||||
"constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY),
|
||||
"constructor.", className, CREDENTIALS_PROVIDER_KEY),
|
||||
e);
|
||||
} catch (ReflectiveOperationException | IllegalArgumentException e) {
|
||||
throw new IOException(className + " instantiation exception.", e);
|
||||
|
@ -188,4 +166,85 @@ final public class AliyunOSSUtils {
|
|||
final long size) {
|
||||
return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
|
||||
}
|
||||
|
||||
/**
|
||||
* Demand create the directory allocator, then create a temporary file.
|
||||
* @param path prefix for the temporary file
|
||||
* @param size the size of the file that is going to be written
|
||||
* @param conf the Configuration object
|
||||
* @return a unique temporary file
|
||||
* @throws IOException IO problems
|
||||
*/
|
||||
public static File createTmpFileForWrite(String path, long size,
|
||||
Configuration conf) throws IOException {
|
||||
if (conf.get(BUFFER_DIR_KEY) == null) {
|
||||
conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
|
||||
}
|
||||
if (directoryAllocator == null) {
|
||||
directoryAllocator = new LocalDirAllocator(BUFFER_DIR_KEY);
|
||||
}
|
||||
return directoryAllocator.createTmpFileForWrite(path, size, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a integer option >= the minimum allowed value.
|
||||
* @param conf configuration
|
||||
* @param key key to look up
|
||||
* @param defVal default value
|
||||
* @param min minimum value
|
||||
* @return the value
|
||||
* @throws IllegalArgumentException if the value is below the minimum
|
||||
*/
|
||||
static int intOption(Configuration conf, String key, int defVal, int min) {
|
||||
int v = conf.getInt(key, defVal);
|
||||
Preconditions.checkArgument(v >= min,
|
||||
String.format("Value of %s: %d is below the minimum value %d",
|
||||
key, v, min));
|
||||
LOG.debug("Value of {} is {}", key, v);
|
||||
return v;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a long option >= the minimum allowed value.
|
||||
* @param conf configuration
|
||||
* @param key key to look up
|
||||
* @param defVal default value
|
||||
* @param min minimum value
|
||||
* @return the value
|
||||
* @throws IllegalArgumentException if the value is below the minimum
|
||||
*/
|
||||
static long longOption(Configuration conf, String key, long defVal,
|
||||
long min) {
|
||||
long v = conf.getLong(key, defVal);
|
||||
Preconditions.checkArgument(v >= min,
|
||||
String.format("Value of %s: %d is below the minimum value %d",
|
||||
key, v, min));
|
||||
LOG.debug("Value of {} is {}", key, v);
|
||||
return v;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a size property from the configuration: this property must
|
||||
* be at least equal to {@link Constants#MULTIPART_MIN_SIZE}.
|
||||
* If it is too small, it is rounded up to that minimum, and a warning
|
||||
* printed.
|
||||
* @param conf configuration
|
||||
* @param property property name
|
||||
* @param defVal default value
|
||||
* @return the value, guaranteed to be above the minimum size
|
||||
*/
|
||||
public static long getMultipartSizeProperty(Configuration conf,
|
||||
String property, long defVal) {
|
||||
long partSize = conf.getLong(property, defVal);
|
||||
if (partSize < MULTIPART_MIN_SIZE) {
|
||||
LOG.warn("{} must be at least 100 KB; configured value is {}",
|
||||
property, partSize);
|
||||
partSize = MULTIPART_MIN_SIZE;
|
||||
} else if (partSize > Integer.MAX_VALUE) {
|
||||
LOG.warn("oss: {} capped to ~2.14GB(maximum allowed size with " +
|
||||
"current output mechanism)", MULTIPART_UPLOAD_PART_SIZE_KEY);
|
||||
partSize = Integer.MAX_VALUE;
|
||||
}
|
||||
return partSize;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ public final class Constants {
|
|||
VersionInfoUtils.getDefaultUserAgent();
|
||||
|
||||
// Class of credential provider
|
||||
public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY =
|
||||
public static final String CREDENTIALS_PROVIDER_KEY =
|
||||
"fs.oss.credentials.provider";
|
||||
|
||||
// OSS access verification
|
||||
|
@ -82,10 +82,14 @@ public final class Constants {
|
|||
public static final int MAX_PAGING_KEYS_DEFAULT = 1000;
|
||||
|
||||
// Size of each of or multipart pieces in bytes
|
||||
public static final String MULTIPART_UPLOAD_SIZE_KEY =
|
||||
public static final String MULTIPART_UPLOAD_PART_SIZE_KEY =
|
||||
"fs.oss.multipart.upload.size";
|
||||
public static final long MULTIPART_UPLOAD_PART_SIZE_DEFAULT =
|
||||
104857600; // 100 MB
|
||||
|
||||
/** The minimum multipart size which Aliyun OSS supports. */
|
||||
public static final int MULTIPART_MIN_SIZE = 100 * 1024;
|
||||
|
||||
public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024;
|
||||
public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000;
|
||||
|
||||
// Minimum size in bytes before we start a multipart uploads or copy
|
||||
|
@ -96,7 +100,6 @@ public final class Constants {
|
|||
|
||||
public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
|
||||
"fs.oss.multipart.download.size";
|
||||
|
||||
public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024;
|
||||
|
||||
public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY =
|
||||
|
@ -139,9 +142,14 @@ public final class Constants {
|
|||
|
||||
public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size";
|
||||
public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024;
|
||||
|
||||
public static final String FS_OSS = "oss";
|
||||
|
||||
public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L;
|
||||
public static final int MAX_RETRIES = 10;
|
||||
public static final String KEEPALIVE_TIME_KEY =
|
||||
"fs.oss.threads.keepalivetime";
|
||||
public static final int KEEPALIVE_TIME_DEFAULT = 60;
|
||||
|
||||
public static final String UPLOAD_ACTIVE_BLOCKS_KEY =
|
||||
"fs.oss.upload.active.blocks";
|
||||
public static final int UPLOAD_ACTIVE_BLOCKS_DEFAULT = 4;
|
||||
}
|
||||
|
|
|
@ -30,10 +30,13 @@ import org.junit.rules.Timeout;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
|
||||
|
||||
/**
|
||||
* Tests regular and multi-part upload functionality for AliyunOSSOutputStream.
|
||||
* Tests regular and multi-part upload functionality for
|
||||
* AliyunOSSBlockOutputStream.
|
||||
*/
|
||||
public class TestAliyunOSSOutputStream {
|
||||
public class TestAliyunOSSBlockOutputStream {
|
||||
private FileSystem fs;
|
||||
private static String testRootPath =
|
||||
AliyunOSSTestUtils.generateUniqueTestPath();
|
||||
|
@ -45,7 +48,7 @@ public class TestAliyunOSSOutputStream {
|
|||
public void setUp() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
|
||||
conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024);
|
||||
conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 5 * 1024 * 1024);
|
||||
fs = AliyunOSSTestUtils.createTestFileSystem(conf);
|
||||
}
|
||||
|
||||
|
@ -56,18 +59,39 @@ public class TestAliyunOSSOutputStream {
|
|||
}
|
||||
}
|
||||
|
||||
protected Path getTestPath() {
|
||||
private Path getTestPath() {
|
||||
return new Path(testRootPath + "/test-aliyun-oss");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZeroByteUpload() throws IOException {
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegularUpload() throws IOException {
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 - 1);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 + 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiPartUpload() throws IOException {
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
|
||||
6 * 1024 * 1024 - 1);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
|
||||
6 * 1024 * 1024 + 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHugeUpload() throws IOException {
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
|
||||
MULTIPART_UPLOAD_PART_SIZE_DEFAULT - 1);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
|
||||
MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
|
||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
|
||||
MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
|
||||
}
|
||||
|
||||
@Test
|
|
@ -33,7 +33,7 @@ public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest {
|
|||
protected Configuration createConfiguration() {
|
||||
Configuration newConf = super.createConfiguration();
|
||||
newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
|
||||
newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING);
|
||||
newConf.setLong(MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_SETTING);
|
||||
return newConf;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue