diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java index b46c67aa5e7..58c14a943bc 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java @@ -35,8 +35,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*; public class AliyunCredentialsProvider implements CredentialsProvider { private Credentials credentials = null; - public AliyunCredentialsProvider(Configuration conf) - throws IOException { + public AliyunCredentialsProvider(Configuration conf) throws IOException { String accessKeyId; String accessKeySecret; String securityToken; diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java new file mode 100644 index 00000000000..2d9a13bd56d --- /dev/null +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.aliyun.oss; + +import com.aliyun.oss.model.PartETag; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + +/** + * Asynchronous multi-part based uploading mechanism to support huge file + * which is larger than 5GB. Data will be buffered on local disk, then uploaded + * to OSS in {@link #close()} method. + */ +public class AliyunOSSBlockOutputStream extends OutputStream { + private static final Logger LOG = + LoggerFactory.getLogger(AliyunOSSBlockOutputStream.class); + private AliyunOSSFileSystemStore store; + private Configuration conf; + private boolean closed; + private String key; + private File blockFile; + private List blockFiles = new ArrayList<>(); + private long blockSize; + private int blockId = 0; + private long blockWritten = 0L; + private String uploadId = null; + private final List> partETagsFutures; + private final ListeningExecutorService executorService; + private OutputStream blockStream; + private final byte[] singleByte = new byte[1]; + + public AliyunOSSBlockOutputStream(Configuration conf, + AliyunOSSFileSystemStore store, + String key, + Long blockSize, + ExecutorService executorService) throws IOException { + this.store = store; + this.conf = conf; + this.key = key; + this.blockSize = blockSize; + this.blockFile = newBlockFile(); + this.blockStream = + new BufferedOutputStream(new FileOutputStream(blockFile)); + this.partETagsFutures = new ArrayList<>(2); + this.executorService = MoreExecutors.listeningDecorator(executorService); + } + + private File newBlockFile() throws IOException { + return AliyunOSSUtils.createTmpFileForWrite( + String.format("oss-block-%04d-", blockId), blockSize, conf); + } + + @Override + public synchronized void flush() throws IOException { + blockStream.flush(); + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + blockStream.flush(); + blockStream.close(); + if (!blockFiles.contains(blockFile)) { + blockFiles.add(blockFile); + } + + try { + if (blockFiles.size() == 1) { + // just upload it directly + store.uploadObject(key, blockFile); + } else { + if (blockWritten > 0) { + ListenableFuture partETagFuture = + executorService.submit(new Callable() { + @Override + public PartETag call() throws Exception { + PartETag partETag = store.uploadPart(blockFile, key, uploadId, + blockId + 1); + return partETag; + } + }); + partETagsFutures.add(partETagFuture); + } + // wait for the partial uploads to finish + final List partETags = waitForAllPartUploads(); + if (null == partETags) { + throw new IOException("Failed to multipart upload to oss, abort it."); + } + store.completeMultipartUpload(key, uploadId, partETags); + } + } finally { + for (File tFile: blockFiles) { + if (tFile.exists() && !tFile.delete()) { + LOG.warn("Failed to delete temporary file {}", tFile); + } + } + closed = true; + } + } + + @Override + public void write(int b) throws IOException { + singleByte[0] = (byte)b; + write(singleByte, 0, 1); + } + + @Override + public synchronized void write(byte[] b, int off, int len) + throws IOException { + if (closed) { + throw new IOException("Stream closed."); + } + try { + blockStream.write(b, off, len); + blockWritten += len; + if (blockWritten >= blockSize) { + uploadCurrentPart(); + blockWritten = 0L; + } + } finally { + for (File tFile: blockFiles) { + if (tFile.exists() && !tFile.delete()) { + LOG.warn("Failed to delete temporary file {}", tFile); + } + } + } + } + + private void uploadCurrentPart() throws IOException { + blockFiles.add(blockFile); + blockStream.flush(); + blockStream.close(); + if (blockId == 0) { + uploadId = store.getUploadId(key); + } + ListenableFuture partETagFuture = + executorService.submit(new Callable() { + @Override + public PartETag call() throws Exception { + PartETag partETag = store.uploadPart(blockFile, key, uploadId, + blockId + 1); + return partETag; + } + }); + partETagsFutures.add(partETagFuture); + blockFile = newBlockFile(); + blockId++; + blockStream = new BufferedOutputStream(new FileOutputStream(blockFile)); + } + + /** + * Block awaiting all outstanding uploads to complete. + * @return list of results + * @throws IOException IO Problems + */ + private List waitForAllPartUploads() throws IOException { + LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size()); + try { + return Futures.allAsList(partETagsFutures).get(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted partUpload", ie); + Thread.currentThread().interrupt(); + return null; + } catch (ExecutionException ee) { + //there is no way of recovering so abort + //cancel all partUploads + LOG.debug("While waiting for upload completion", ee); + LOG.debug("Cancelling futures"); + for (ListenableFuture future : partETagsFutures) { + future.cancel(true); + } + //abort multipartupload + store.abortMultipartUpload(key, uploadId); + throw new IOException("Multi-part upload with id '" + uploadId + + "' to " + key, ee); + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index ae8e491a5e7..73568180ebb 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -56,6 +56,8 @@ import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.intOption; +import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.longOption; import static org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils.objectRepresentsDirectory; import static org.apache.hadoop.fs.aliyun.oss.Constants.*; @@ -69,6 +71,7 @@ public class AliyunOSSFileSystem extends FileSystem { private URI uri; private String bucket; private Path workingDir; + private int blockOutputActiveBlocks; private AliyunOSSFileSystemStore store; private int maxKeys; private int maxReadAheadPartNumber; @@ -125,8 +128,15 @@ public class AliyunOSSFileSystem extends FileSystem { // this means the file is not found } - return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(), - store, key, progress, statistics), (Statistics)(null)); + long uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(getConf(), + MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT); + return new FSDataOutputStream( + new AliyunOSSBlockOutputStream(getConf(), + store, + key, + uploadPartSize, + new SemaphoredDelegatingExecutor(boundedThreadPool, + blockOutputActiveBlocks, true)), (Statistics)(null)); } /** @@ -149,9 +159,8 @@ public class AliyunOSSFileSystem extends FileSystem { throw new FileAlreadyExistsException("Not a directory: " + parent); } } - return create(path, permission, - flags.contains(CreateFlag.OVERWRITE), bufferSize, - replication, blockSize, progress); + return create(path, permission, flags.contains(CreateFlag.OVERWRITE), + bufferSize, replication, blockSize, progress); } @Override @@ -270,7 +279,7 @@ public class AliyunOSSFileSystem extends FileSystem { } } else if (objectRepresentsDirectory(key, meta.getContentLength())) { return new FileStatus(0, true, 1, 0, meta.getLastModified().getTime(), - qualifiedPath); + qualifiedPath); } else { return new FileStatus(meta.getContentLength(), false, 1, getDefaultBlockSize(path), meta.getLastModified().getTime(), @@ -318,6 +327,10 @@ public class AliyunOSSFileSystem extends FileSystem { uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority()); workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(uri, null); + long keepAliveTime = longOption(conf, + KEEPALIVE_TIME_KEY, KEEPALIVE_TIME_DEFAULT, 0); + blockOutputActiveBlocks = intOption(conf, + UPLOAD_ACTIVE_BLOCKS_KEY, UPLOAD_ACTIVE_BLOCKS_DEFAULT, 1); store = new AliyunOSSFileSystemStore(); store.initialize(name, conf, statistics); @@ -335,7 +348,8 @@ public class AliyunOSSFileSystem extends FileSystem { Constants.MULTIPART_DOWNLOAD_AHEAD_PART_MAX_NUM_DEFAULT); this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( - threadNum, totalTasks, 60L, TimeUnit.SECONDS, "oss-read-shared"); + threadNum, totalTasks, keepAliveTime, TimeUnit.SECONDS, + "oss-transfer-shared"); maxConcurrentCopyTasksPerDir = AliyunOSSUtils.intPositiveOption(conf, Constants.MAX_CONCURRENT_COPY_TASKS_PER_DIR_KEY, diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java index c779d96bc04..4036215f6ca 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.fs.aliyun.oss; import com.aliyun.oss.ClientConfiguration; @@ -62,8 +63,11 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.Serializable; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; @@ -83,7 +87,6 @@ public class AliyunOSSFileSystemStore { private String bucketName; private long uploadPartSize; private long multipartThreshold; - private long partSize; private int maxKeys; private String serverSideEncryptionAlgorithm; @@ -143,28 +146,18 @@ public class AliyunOSSFileSystemStore { String endPoint = conf.getTrimmed(ENDPOINT_KEY, ""); if (StringUtils.isEmpty(endPoint)) { throw new IllegalArgumentException("Aliyun OSS endpoint should not be " + - "null or empty. Please set proper endpoint with 'fs.oss.endpoint'."); + "null or empty. Please set proper endpoint with 'fs.oss.endpoint'."); } CredentialsProvider provider = AliyunOSSUtils.getCredentialsProvider(conf); ossClient = new OSSClient(endPoint, provider, clientConf); - uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, - MULTIPART_UPLOAD_SIZE_DEFAULT); + uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf, + MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT); multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); - partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, - MULTIPART_UPLOAD_SIZE_DEFAULT); - if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) { - partSize = MIN_MULTIPART_UPLOAD_PART_SIZE; - } serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, ""); - if (uploadPartSize < 5 * 1024 * 1024) { - LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB"); - uploadPartSize = 5 * 1024 * 1024; - } - if (multipartThreshold < 5 * 1024 * 1024) { LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB"); multipartThreshold = 5 * 1024 * 1024; @@ -419,71 +412,6 @@ public class AliyunOSSFileSystemStore { } } - /** - * Upload a file as an OSS object, using multipart upload. - * - * @param key object key. - * @param file local file to upload. - * @throws IOException if failed to upload object. - */ - public void multipartUploadObject(String key, File file) throws IOException { - File object = file.getAbsoluteFile(); - long dataLen = object.length(); - long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize); - int partNum = (int) (dataLen / realPartSize); - if (dataLen % realPartSize != 0) { - partNum += 1; - } - - InitiateMultipartUploadRequest initiateMultipartUploadRequest = - new InitiateMultipartUploadRequest(bucketName, key); - ObjectMetadata meta = new ObjectMetadata(); - if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) { - meta.setServerSideEncryption(serverSideEncryptionAlgorithm); - } - initiateMultipartUploadRequest.setObjectMetadata(meta); - InitiateMultipartUploadResult initiateMultipartUploadResult = - ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); - List partETags = new ArrayList(); - String uploadId = initiateMultipartUploadResult.getUploadId(); - - try { - for (int i = 0; i < partNum; i++) { - // TODO: Optimize this, avoid opening the object multiple times - FileInputStream fis = new FileInputStream(object); - try { - long skipBytes = realPartSize * i; - AliyunOSSUtils.skipFully(fis, skipBytes); - long size = (realPartSize < dataLen - skipBytes) ? - realPartSize : dataLen - skipBytes; - UploadPartRequest uploadPartRequest = new UploadPartRequest(); - uploadPartRequest.setBucketName(bucketName); - uploadPartRequest.setKey(key); - uploadPartRequest.setUploadId(uploadId); - uploadPartRequest.setInputStream(fis); - uploadPartRequest.setPartSize(size); - uploadPartRequest.setPartNumber(i + 1); - UploadPartResult uploadPartResult = - ossClient.uploadPart(uploadPartRequest); - statistics.incrementWriteOps(1); - partETags.add(uploadPartResult.getPartETag()); - } finally { - fis.close(); - } - } - CompleteMultipartUploadRequest completeMultipartUploadRequest = - new CompleteMultipartUploadRequest(bucketName, key, - uploadId, partETags); - CompleteMultipartUploadResult completeMultipartUploadResult = - ossClient.completeMultipartUpload(completeMultipartUploadRequest); - LOG.debug(completeMultipartUploadResult.getETag()); - } catch (OSSException | ClientException e) { - AbortMultipartUploadRequest abortMultipartUploadRequest = - new AbortMultipartUploadRequest(bucketName, key, uploadId); - ossClient.abortMultipartUpload(abortMultipartUploadRequest); - } - } - /** * list objects. * @@ -494,7 +422,7 @@ public class AliyunOSSFileSystemStore { * @return a list of matches. */ public ObjectListing listObjects(String prefix, int maxListingLength, - String marker, boolean recursive) { + String marker, boolean recursive) { String delimiter = recursive ? null : "/"; prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix); ListObjectsRequest listRequest = new ListObjectsRequest(bucketName); @@ -657,4 +585,83 @@ public class AliyunOSSFileSystemStore { } }; } + + public PartETag uploadPart(File file, String key, String uploadId, int idx) + throws IOException { + InputStream instream = null; + Exception caught = null; + int tries = 3; + while (tries > 0) { + try { + instream = new FileInputStream(file); + UploadPartRequest uploadRequest = new UploadPartRequest(); + uploadRequest.setBucketName(bucketName); + uploadRequest.setKey(key); + uploadRequest.setUploadId(uploadId); + uploadRequest.setInputStream(instream); + uploadRequest.setPartSize(file.length()); + uploadRequest.setPartNumber(idx); + UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest); + return uploadResult.getPartETag(); + } catch (Exception e) { + LOG.debug("Failed to upload "+ file.getPath() +", " + + "try again.", e); + caught = e; + } finally { + if (instream != null) { + instream.close(); + instream = null; + } + } + tries--; + } + + assert (caught != null); + throw new IOException("Failed to upload " + file.getPath() + + " for 3 times.", caught); + } + + /** + * Initiate multipart upload. + */ + public String getUploadId(String key) { + InitiateMultipartUploadRequest initiateMultipartUploadRequest = + new InitiateMultipartUploadRequest(bucketName, key); + InitiateMultipartUploadResult initiateMultipartUploadResult = + ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); + return initiateMultipartUploadResult.getUploadId(); + } + + /** + * Complete the specific multipart upload. + */ + public CompleteMultipartUploadResult completeMultipartUpload(String key, + String uploadId, List partETags) { + Collections.sort(partETags, new PartNumberAscendComparator()); + CompleteMultipartUploadRequest completeMultipartUploadRequest = + new CompleteMultipartUploadRequest(bucketName, key, uploadId, + partETags); + return ossClient.completeMultipartUpload(completeMultipartUploadRequest); + } + + /** + * Abort the specific multipart upload. + */ + public void abortMultipartUpload(String key, String uploadId) { + AbortMultipartUploadRequest request = new AbortMultipartUploadRequest( + bucketName, key, uploadId); + ossClient.abortMultipartUpload(request); + } + + private static class PartNumberAscendComparator + implements Comparator, Serializable { + @Override + public int compare(PartETag o1, PartETag o2) { + if (o1.getPartNumber() > o2.getPartNumber()) { + return 1; + } else { + return -1; + } + } + } } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java deleted file mode 100644 index c952d0ae858..00000000000 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.aliyun.oss; - -import static org.apache.hadoop.fs.aliyun.oss.Constants.*; - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem.Statistics; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.util.Progressable; - -/** - * The output stream for OSS blob system. - * Data will be buffered on local disk, then uploaded to OSS in - * {@link #close()} method. - */ -public class AliyunOSSOutputStream extends OutputStream { - public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class); - private AliyunOSSFileSystemStore store; - private final String key; - private Statistics statistics; - private Progressable progress; - private long partSizeThreshold; - private LocalDirAllocator dirAlloc; - private boolean closed; - private File tmpFile; - private BufferedOutputStream backupStream; - - public AliyunOSSOutputStream(Configuration conf, - AliyunOSSFileSystemStore store, String key, Progressable progress, - Statistics statistics) throws IOException { - this.store = store; - this.key = key; - // The caller cann't get any progress information - this.progress = progress; - this.statistics = statistics; - partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, - MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); - - if (conf.get(BUFFER_DIR_KEY) == null) { - conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss"); - } - dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); - - tmpFile = dirAlloc.createTmpFileForWrite("output-", - LocalDirAllocator.SIZE_UNKNOWN, conf); - backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile)); - closed = false; - } - - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - closed = true; - if (backupStream != null) { - backupStream.close(); - } - long dataLen = tmpFile.length(); - try { - if (dataLen <= partSizeThreshold) { - store.uploadObject(key, tmpFile); - } else { - store.multipartUploadObject(key, tmpFile); - } - } finally { - if (!tmpFile.delete()) { - LOG.warn("Can not delete file: " + tmpFile); - } - } - } - - - - @Override - public synchronized void flush() throws IOException { - backupStream.flush(); - } - - @Override - public synchronized void write(int b) throws IOException { - backupStream.write(b); - statistics.incrementBytesWritten(1); - } - -} diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java index 1a2160889a1..2fe06c1b05f 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java @@ -18,12 +18,14 @@ package org.apache.hadoop.fs.aliyun.oss; +import java.io.File; import java.io.IOException; -import java.io.InputStream; import com.aliyun.oss.common.auth.CredentialsProvider; +import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.security.ProviderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +38,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.*; final public class AliyunOSSUtils { private static final Logger LOG = LoggerFactory.getLogger(AliyunOSSUtils.class); + private static LocalDirAllocator directoryAllocator; private AliyunOSSUtils() { } @@ -74,31 +77,6 @@ final public class AliyunOSSUtils { } } - /** - * Skip the requested number of bytes or fail if there are no enough bytes - * left. This allows for the possibility that {@link InputStream#skip(long)} - * may not skip as many bytes as requested (most likely because of reaching - * EOF). - * - * @param is the input stream to skip. - * @param n the number of bytes to skip. - * @throws IOException thrown when skipped less number of bytes. - */ - public static void skipFully(InputStream is, long n) throws IOException { - long total = 0; - long cur = 0; - - do { - cur = is.skip(n - total); - total += cur; - } while((total < n) && (cur > 0)); - - if (total < n) { - throw new IOException("Failed to skip " + n + " bytes, possibly due " + - "to EOF."); - } - } - /** * Calculate a proper size of multipart piece. If minPartSize * is too small, the number of multipart pieces may exceed the limit of @@ -126,7 +104,7 @@ final public class AliyunOSSUtils { throws IOException { CredentialsProvider credentials; - String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY); + String className = conf.getTrimmed(CREDENTIALS_PROVIDER_KEY); if (StringUtils.isEmpty(className)) { Configuration newConf = ProviderUtils.excludeIncompatibleCredentialProviders(conf, @@ -151,7 +129,7 @@ final public class AliyunOSSUtils { throw new IOException(String.format("%s constructor exception. A " + "class specified in %s must provide an accessible constructor " + "accepting URI and Configuration, or an accessible default " + - "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY), + "constructor.", className, CREDENTIALS_PROVIDER_KEY), e); } catch (ReflectiveOperationException | IllegalArgumentException e) { throw new IOException(className + " instantiation exception.", e); @@ -188,4 +166,85 @@ final public class AliyunOSSUtils { final long size) { return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L; } + + /** + * Demand create the directory allocator, then create a temporary file. + * @param path prefix for the temporary file + * @param size the size of the file that is going to be written + * @param conf the Configuration object + * @return a unique temporary file + * @throws IOException IO problems + */ + public static File createTmpFileForWrite(String path, long size, + Configuration conf) throws IOException { + if (conf.get(BUFFER_DIR_KEY) == null) { + conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss"); + } + if (directoryAllocator == null) { + directoryAllocator = new LocalDirAllocator(BUFFER_DIR_KEY); + } + return directoryAllocator.createTmpFileForWrite(path, size, conf); + } + + /** + * Get a integer option >= the minimum allowed value. + * @param conf configuration + * @param key key to look up + * @param defVal default value + * @param min minimum value + * @return the value + * @throws IllegalArgumentException if the value is below the minimum + */ + static int intOption(Configuration conf, String key, int defVal, int min) { + int v = conf.getInt(key, defVal); + Preconditions.checkArgument(v >= min, + String.format("Value of %s: %d is below the minimum value %d", + key, v, min)); + LOG.debug("Value of {} is {}", key, v); + return v; + } + + /** + * Get a long option >= the minimum allowed value. + * @param conf configuration + * @param key key to look up + * @param defVal default value + * @param min minimum value + * @return the value + * @throws IllegalArgumentException if the value is below the minimum + */ + static long longOption(Configuration conf, String key, long defVal, + long min) { + long v = conf.getLong(key, defVal); + Preconditions.checkArgument(v >= min, + String.format("Value of %s: %d is below the minimum value %d", + key, v, min)); + LOG.debug("Value of {} is {}", key, v); + return v; + } + + /** + * Get a size property from the configuration: this property must + * be at least equal to {@link Constants#MULTIPART_MIN_SIZE}. + * If it is too small, it is rounded up to that minimum, and a warning + * printed. + * @param conf configuration + * @param property property name + * @param defVal default value + * @return the value, guaranteed to be above the minimum size + */ + public static long getMultipartSizeProperty(Configuration conf, + String property, long defVal) { + long partSize = conf.getLong(property, defVal); + if (partSize < MULTIPART_MIN_SIZE) { + LOG.warn("{} must be at least 100 KB; configured value is {}", + property, partSize); + partSize = MULTIPART_MIN_SIZE; + } else if (partSize > Integer.MAX_VALUE) { + LOG.warn("oss: {} capped to ~2.14GB(maximum allowed size with " + + "current output mechanism)", MULTIPART_UPLOAD_PART_SIZE_KEY); + partSize = Integer.MAX_VALUE; + } + return partSize; + } } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java index 283927c828e..21b2c0dc5dd 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java @@ -31,10 +31,10 @@ public final class Constants { // User agent public static final String USER_AGENT_PREFIX = "fs.oss.user.agent.prefix"; public static final String USER_AGENT_PREFIX_DEFAULT = - VersionInfoUtils.getDefaultUserAgent(); + VersionInfoUtils.getDefaultUserAgent(); // Class of credential provider - public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY = + public static final String CREDENTIALS_PROVIDER_KEY = "fs.oss.credentials.provider"; // OSS access verification @@ -82,10 +82,14 @@ public final class Constants { public static final int MAX_PAGING_KEYS_DEFAULT = 1000; // Size of each of or multipart pieces in bytes - public static final String MULTIPART_UPLOAD_SIZE_KEY = + public static final String MULTIPART_UPLOAD_PART_SIZE_KEY = "fs.oss.multipart.upload.size"; + public static final long MULTIPART_UPLOAD_PART_SIZE_DEFAULT = + 104857600; // 100 MB + + /** The minimum multipart size which Aliyun OSS supports. */ + public static final int MULTIPART_MIN_SIZE = 100 * 1024; - public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024; public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000; // Minimum size in bytes before we start a multipart uploads or copy @@ -96,7 +100,6 @@ public final class Constants { public static final String MULTIPART_DOWNLOAD_SIZE_KEY = "fs.oss.multipart.download.size"; - public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 512 * 1024; public static final String MULTIPART_DOWNLOAD_THREAD_NUMBER_KEY = @@ -139,9 +142,15 @@ public final class Constants { public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size"; public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024; + public static final String FS_OSS = "oss"; - public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L; - public static final int MAX_RETRIES = 10; + public static final String KEEPALIVE_TIME_KEY = + "fs.oss.threads.keepalivetime"; + public static final int KEEPALIVE_TIME_DEFAULT = 60; + + public static final String UPLOAD_ACTIVE_BLOCKS_KEY = + "fs.oss.upload.active.blocks"; + public static final int UPLOAD_ACTIVE_BLOCKS_DEFAULT = 4; } diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java similarity index 70% rename from hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java rename to hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java index 6b87d9ca466..365d93142a3 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java @@ -30,10 +30,13 @@ import org.junit.rules.Timeout; import java.io.IOException; +import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT; + /** - * Tests regular and multi-part upload functionality for AliyunOSSOutputStream. + * Tests regular and multi-part upload functionality for + * AliyunOSSBlockOutputStream. */ -public class TestAliyunOSSOutputStream { +public class TestAliyunOSSBlockOutputStream { private FileSystem fs; private static String testRootPath = AliyunOSSTestUtils.generateUniqueTestPath(); @@ -45,7 +48,7 @@ public class TestAliyunOSSOutputStream { public void setUp() throws Exception { Configuration conf = new Configuration(); conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024); - conf.setInt(Constants.MULTIPART_UPLOAD_SIZE_KEY, 5 * 1024 * 1024); + conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 5 * 1024 * 1024); fs = AliyunOSSTestUtils.createTestFileSystem(conf); } @@ -56,18 +59,39 @@ public class TestAliyunOSSOutputStream { } } - protected Path getTestPath() { + private Path getTestPath() { return new Path(testRootPath + "/test-aliyun-oss"); } + @Test + public void testZeroByteUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0); + } + @Test public void testRegularUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 - 1); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 + 1); } @Test public void testMultiPartUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), + 6 * 1024 * 1024 - 1); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), + 6 * 1024 * 1024 + 1); + } + + @Test + public void testHugeUpload() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), + MULTIPART_UPLOAD_PART_SIZE_DEFAULT - 1); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), + MULTIPART_UPLOAD_PART_SIZE_DEFAULT); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), + MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1); } @Test diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java index f0bb26d8e7d..c413f83b203 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSInputStream.java @@ -123,15 +123,15 @@ public class TestAliyunOSSInputStream { + fsDataInputStream.getPos(), fsDataInputStream.getPos() == 0); assertTrue("expected position at:" - + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:" - + in.getExpectNextPos(), + + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:" + + in.getExpectNextPos(), in.getExpectNextPos() == Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT); fsDataInputStream.seek(4 * 1024 * 1024); assertTrue("expected position at:" + 4 * 1024 * 1024 - + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:" - + in.getExpectNextPos(), + + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT + ", but got:" + + in.getExpectNextPos(), in.getExpectNextPos() == 4 * 1024 * 1024 - + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT); + + Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT); IOUtils.closeStream(fsDataInputStream); } diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java index 18d09d54149..e9a98b3307f 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java @@ -33,7 +33,7 @@ public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest { protected Configuration createConfiguration() { Configuration newConf = super.createConfiguration(); newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING); - newConf.setLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_SETTING); + newConf.setLong(MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_SETTING); return newConf; }