HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel)
This commit is contained in:
parent
0a502c665b
commit
824c32de1a
|
@ -249,6 +249,8 @@ Release 2.7.0 - UNRELEASED
|
||||||
HADOOP-11620. Add support for load balancing across a group of KMS for HA.
|
HADOOP-11620. Add support for load balancing across a group of KMS for HA.
|
||||||
(Arun Suresh via wang)
|
(Arun Suresh via wang)
|
||||||
|
|
||||||
|
HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HADOOP-11512. Use getTrimmedStrings when reading serialization keys
|
HADOOP-11512. Use getTrimmedStrings when reading serialization keys
|
||||||
|
|
|
@ -755,13 +755,13 @@ for ldap providers in the same way as above does.
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.connection.establish.timeout</name>
|
<name>fs.s3a.connection.establish.timeout</name>
|
||||||
<value>5000</value>
|
<value>5000</value>
|
||||||
<description>Socket connection setup timeout in seconds.</description>
|
<description>Socket connection setup timeout in milliseconds.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.connection.timeout</name>
|
<name>fs.s3a.connection.timeout</name>
|
||||||
<value>50000</value>
|
<value>50000</value>
|
||||||
<description>Socket connection timeout in seconds.</description>
|
<description>Socket connection timeout in milliseconds.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
@ -837,6 +837,22 @@ for ldap providers in the same way as above does.
|
||||||
uploads to.</description>
|
uploads to.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.fast.upload</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>Upload directly from memory instead of buffering to
|
||||||
|
disk first. Memory usage and parallelism can be controlled as up to
|
||||||
|
fs.s3a.multipart.size memory is consumed for each (part)upload actively
|
||||||
|
uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.fast.buffer.size</name>
|
||||||
|
<value>1048576</value>
|
||||||
|
<description>Size of initial memory buffer in bytes allocated for an
|
||||||
|
upload. No effect if fs.s3a.fast.upload is false.</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.impl</name>
|
<name>fs.s3a.impl</name>
|
||||||
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
|
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
|
||||||
|
|
|
@ -83,6 +83,14 @@ public class Constants {
|
||||||
// comma separated list of directories
|
// comma separated list of directories
|
||||||
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
|
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
|
||||||
|
|
||||||
|
// should we upload directly from memory rather than using a file buffer
|
||||||
|
public static final String FAST_UPLOAD = "fs.s3a.fast.upload";
|
||||||
|
public static final boolean DEFAULT_FAST_UPLOAD = false;
|
||||||
|
|
||||||
|
//initial size of memory buffer for a fast upload
|
||||||
|
public static final String FAST_BUFFER_SIZE = "fs.s3a.fast.buffer.size";
|
||||||
|
public static final int DEFAULT_FAST_BUFFER_SIZE = 1048576; //1MB
|
||||||
|
|
||||||
// private | public-read | public-read-write | authenticated-read |
|
// private | public-read | public-read-write | authenticated-read |
|
||||||
// log-delivery-write | bucket-owner-read | bucket-owner-full-control
|
// log-delivery-write | bucket-owner-read | bucket-owner-full-control
|
||||||
public static final String CANNED_ACL = "fs.s3a.acl.default";
|
public static final String CANNED_ACL = "fs.s3a.acl.default";
|
||||||
|
|
|
@ -0,0 +1,413 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import com.amazonaws.AmazonClientException;
|
||||||
|
import com.amazonaws.AmazonServiceException;
|
||||||
|
import com.amazonaws.event.ProgressEvent;
|
||||||
|
import com.amazonaws.event.ProgressListener;
|
||||||
|
import com.amazonaws.services.s3.AmazonS3Client;
|
||||||
|
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
|
||||||
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||||
|
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
||||||
|
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
||||||
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
|
import com.amazonaws.services.s3.model.PartETag;
|
||||||
|
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||||
|
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||||
|
import com.amazonaws.services.s3.model.UploadPartRequest;
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Upload files/parts asap directly from a memory buffer (instead of buffering
|
||||||
|
* to a file).
|
||||||
|
* <p/>
|
||||||
|
* Uploads are managed low-level rather than through the AWS TransferManager.
|
||||||
|
* This allows for uploading each part of a multi-part upload as soon as
|
||||||
|
* the bytes are in memory, rather than waiting until the file is closed.
|
||||||
|
* <p/>
|
||||||
|
* Unstable: statistics and error handling might evolve
|
||||||
|
*/
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class S3AFastOutputStream extends OutputStream {
|
||||||
|
|
||||||
|
private static final Logger LOG = S3AFileSystem.LOG;
|
||||||
|
private final String key;
|
||||||
|
private final String bucket;
|
||||||
|
private final AmazonS3Client client;
|
||||||
|
private final int partSize;
|
||||||
|
private final int multiPartThreshold;
|
||||||
|
private final S3AFileSystem fs;
|
||||||
|
private final CannedAccessControlList cannedACL;
|
||||||
|
private final FileSystem.Statistics statistics;
|
||||||
|
private final String serverSideEncryptionAlgorithm;
|
||||||
|
private final ProgressListener progressListener;
|
||||||
|
private final ListeningExecutorService executorService;
|
||||||
|
private MultiPartUpload multiPartUpload;
|
||||||
|
private boolean closed;
|
||||||
|
private ByteArrayOutputStream buffer;
|
||||||
|
private int bufferLimit;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a fast OutputStream that uploads to S3 from memory.
|
||||||
|
* For MultiPartUploads, as soon as sufficient bytes have been written to
|
||||||
|
* the stream a part is uploaded immediately (by using the low-level
|
||||||
|
* multi-part upload API on the AmazonS3Client).
|
||||||
|
*
|
||||||
|
* @param client AmazonS3Client used for S3 calls
|
||||||
|
* @param fs S3AFilesystem
|
||||||
|
* @param bucket S3 bucket name
|
||||||
|
* @param key S3 key name
|
||||||
|
* @param progress report progress in order to prevent timeouts
|
||||||
|
* @param statistics track FileSystem.Statistics on the performed operations
|
||||||
|
* @param cannedACL used CannedAccessControlList
|
||||||
|
* @param serverSideEncryptionAlgorithm algorithm for server side encryption
|
||||||
|
* @param partSize size of a single part in a multi-part upload (except
|
||||||
|
* last part)
|
||||||
|
* @param multiPartThreshold files at least this size use multi-part upload
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
|
||||||
|
String bucket, String key, Progressable progress,
|
||||||
|
FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
|
||||||
|
String serverSideEncryptionAlgorithm, long partSize,
|
||||||
|
long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor)
|
||||||
|
throws IOException {
|
||||||
|
this.bucket = bucket;
|
||||||
|
this.key = key;
|
||||||
|
this.client = client;
|
||||||
|
this.fs = fs;
|
||||||
|
this.cannedACL = cannedACL;
|
||||||
|
this.statistics = statistics;
|
||||||
|
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
|
||||||
|
//Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE
|
||||||
|
if (partSize > Integer.MAX_VALUE) {
|
||||||
|
this.partSize = Integer.MAX_VALUE;
|
||||||
|
LOG.warn("s3a: MULTIPART_SIZE capped to ~2.14GB (maximum allowed size " +
|
||||||
|
"when using 'FAST_UPLOAD = true')");
|
||||||
|
} else {
|
||||||
|
this.partSize = (int) partSize;
|
||||||
|
}
|
||||||
|
if (multiPartThreshold > Integer.MAX_VALUE) {
|
||||||
|
this.multiPartThreshold = Integer.MAX_VALUE;
|
||||||
|
LOG.warn("s3a: MIN_MULTIPART_THRESHOLD capped to ~2.14GB (maximum " +
|
||||||
|
"allowed size when using 'FAST_UPLOAD = true')");
|
||||||
|
} else {
|
||||||
|
this.multiPartThreshold = (int) multiPartThreshold;
|
||||||
|
}
|
||||||
|
this.bufferLimit = this.multiPartThreshold;
|
||||||
|
this.closed = false;
|
||||||
|
int initialBufferSize = this.fs.getConf()
|
||||||
|
.getInt(Constants.FAST_BUFFER_SIZE, Constants.DEFAULT_FAST_BUFFER_SIZE);
|
||||||
|
if (initialBufferSize < 0) {
|
||||||
|
LOG.warn("s3a: FAST_BUFFER_SIZE should be a positive number. Using " +
|
||||||
|
"default value");
|
||||||
|
initialBufferSize = Constants.DEFAULT_FAST_BUFFER_SIZE;
|
||||||
|
} else if (initialBufferSize > this.bufferLimit) {
|
||||||
|
LOG.warn("s3a: automatically adjusting FAST_BUFFER_SIZE to not " +
|
||||||
|
"exceed MIN_MULTIPART_THRESHOLD");
|
||||||
|
initialBufferSize = this.bufferLimit;
|
||||||
|
}
|
||||||
|
this.buffer = new ByteArrayOutputStream(initialBufferSize);
|
||||||
|
this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
|
||||||
|
this.multiPartUpload = null;
|
||||||
|
this.progressListener = new ProgressableListener(progress);
|
||||||
|
if (LOG.isDebugEnabled()){
|
||||||
|
LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
|
||||||
|
bucket, key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes a byte to the memory buffer. If this causes the buffer to reach
|
||||||
|
* its limit, the actual upload is submitted to the threadpool.
|
||||||
|
* @param b the int of which the lowest byte is written
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void write(int b) throws IOException {
|
||||||
|
buffer.write(b);
|
||||||
|
if (buffer.size() == bufferLimit) {
|
||||||
|
uploadBuffer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes a range of bytes from to the memory buffer. If this causes the
|
||||||
|
* buffer to reach its limit, the actual upload is submitted to the
|
||||||
|
* threadpool and the remainder of the array is written to memory
|
||||||
|
* (recursively).
|
||||||
|
* @param b byte array containing
|
||||||
|
* @param off offset in array where to start
|
||||||
|
* @param len number of bytes to be written
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void write(byte b[], int off, int len)
|
||||||
|
throws IOException {
|
||||||
|
if (b == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
} else if ((off < 0) || (off > b.length) || (len < 0) ||
|
||||||
|
((off + len) > b.length) || ((off + len) < 0)) {
|
||||||
|
throw new IndexOutOfBoundsException();
|
||||||
|
} else if (len == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (buffer.size() + len < bufferLimit) {
|
||||||
|
buffer.write(b, off, len);
|
||||||
|
} else {
|
||||||
|
int firstPart = bufferLimit - buffer.size();
|
||||||
|
buffer.write(b, off, firstPart);
|
||||||
|
uploadBuffer();
|
||||||
|
this.write(b, off + firstPart, len - firstPart);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void uploadBuffer() throws IOException {
|
||||||
|
if (multiPartUpload == null) {
|
||||||
|
multiPartUpload = initiateMultiPartUpload();
|
||||||
|
/* Upload the existing buffer if it exceeds partSize. This possibly
|
||||||
|
requires multiple parts! */
|
||||||
|
final byte[] allBytes = buffer.toByteArray();
|
||||||
|
buffer = null; //earlier gc?
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Total length of initial buffer: {}", allBytes.length);
|
||||||
|
}
|
||||||
|
int processedPos = 0;
|
||||||
|
while ((multiPartThreshold - processedPos) >= partSize) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Initial buffer: processing from byte {} to byte {}",
|
||||||
|
processedPos, (processedPos + partSize - 1));
|
||||||
|
}
|
||||||
|
multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
|
||||||
|
processedPos, partSize), partSize);
|
||||||
|
processedPos += partSize;
|
||||||
|
}
|
||||||
|
//resize and reset stream
|
||||||
|
bufferLimit = partSize;
|
||||||
|
buffer = new ByteArrayOutputStream(bufferLimit);
|
||||||
|
buffer.write(allBytes, processedPos, multiPartThreshold - processedPos);
|
||||||
|
} else {
|
||||||
|
//upload next part
|
||||||
|
multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
|
||||||
|
.toByteArray()), partSize);
|
||||||
|
buffer.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void close() throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
closed = true;
|
||||||
|
try {
|
||||||
|
if (multiPartUpload == null) {
|
||||||
|
putObject();
|
||||||
|
} else {
|
||||||
|
if (buffer.size() > 0) {
|
||||||
|
//send last part
|
||||||
|
multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
|
||||||
|
.toByteArray()), buffer.size());
|
||||||
|
}
|
||||||
|
final List<PartETag> partETags = multiPartUpload
|
||||||
|
.waitForAllPartUploads();
|
||||||
|
multiPartUpload.complete(partETags);
|
||||||
|
}
|
||||||
|
statistics.incrementWriteOps(1);
|
||||||
|
// This will delete unnecessary fake parent directories
|
||||||
|
fs.finishedWrite(key);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
buffer = null;
|
||||||
|
super.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ObjectMetadata createDefaultMetadata() {
|
||||||
|
ObjectMetadata om = new ObjectMetadata();
|
||||||
|
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
||||||
|
om.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
||||||
|
}
|
||||||
|
return om;
|
||||||
|
}
|
||||||
|
|
||||||
|
private MultiPartUpload initiateMultiPartUpload() throws IOException {
|
||||||
|
final ObjectMetadata om = createDefaultMetadata();
|
||||||
|
final InitiateMultipartUploadRequest initiateMPURequest =
|
||||||
|
new InitiateMultipartUploadRequest(bucket, key, om);
|
||||||
|
initiateMPURequest.setCannedACL(cannedACL);
|
||||||
|
try {
|
||||||
|
return new MultiPartUpload(
|
||||||
|
client.initiateMultipartUpload(initiateMPURequest).getUploadId());
|
||||||
|
} catch (AmazonServiceException ase) {
|
||||||
|
throw new IOException("Unable to initiate MultiPartUpload (server side)" +
|
||||||
|
": " + ase, ase);
|
||||||
|
} catch (AmazonClientException ace) {
|
||||||
|
throw new IOException("Unable to initiate MultiPartUpload (client side)" +
|
||||||
|
": " + ace, ace);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void putObject() throws IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket,
|
||||||
|
key);
|
||||||
|
}
|
||||||
|
final ObjectMetadata om = createDefaultMetadata();
|
||||||
|
om.setContentLength(buffer.size());
|
||||||
|
final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
|
||||||
|
new ByteArrayInputStream(buffer.toByteArray()), om);
|
||||||
|
putObjectRequest.setCannedAcl(cannedACL);
|
||||||
|
putObjectRequest.setGeneralProgressListener(progressListener);
|
||||||
|
ListenableFuture<PutObjectResult> putObjectResult =
|
||||||
|
executorService.submit(new Callable<PutObjectResult>() {
|
||||||
|
@Override
|
||||||
|
public PutObjectResult call() throws Exception {
|
||||||
|
return client.putObject(putObjectRequest);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
//wait for completion
|
||||||
|
try {
|
||||||
|
putObjectResult.get();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.warn("Interrupted object upload:" + ie, ie);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
} catch (ExecutionException ee) {
|
||||||
|
throw new IOException("Regular upload failed", ee.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class MultiPartUpload {
|
||||||
|
private final String uploadId;
|
||||||
|
private final List<ListenableFuture<PartETag>> partETagsFutures;
|
||||||
|
|
||||||
|
public MultiPartUpload(String uploadId) {
|
||||||
|
this.uploadId = uploadId;
|
||||||
|
this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
|
||||||
|
"id '{}'", bucket, key, uploadId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void uploadPartAsync(ByteArrayInputStream inputStream,
|
||||||
|
int partSize) {
|
||||||
|
final int currentPartNumber = partETagsFutures.size() + 1;
|
||||||
|
final UploadPartRequest request =
|
||||||
|
new UploadPartRequest().withBucketName(bucket).withKey(key)
|
||||||
|
.withUploadId(uploadId).withInputStream(inputStream)
|
||||||
|
.withPartNumber(currentPartNumber).withPartSize(partSize);
|
||||||
|
request.setGeneralProgressListener(progressListener);
|
||||||
|
ListenableFuture<PartETag> partETagFuture =
|
||||||
|
executorService.submit(new Callable<PartETag>() {
|
||||||
|
@Override
|
||||||
|
public PartETag call() throws Exception {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
|
||||||
|
uploadId);
|
||||||
|
}
|
||||||
|
return client.uploadPart(request).getPartETag();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
partETagsFutures.add(partETagFuture);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<PartETag> waitForAllPartUploads() throws IOException {
|
||||||
|
try {
|
||||||
|
return Futures.allAsList(partETagsFutures).get();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.warn("Interrupted partUpload:" + ie, ie);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
} catch (ExecutionException ee) {
|
||||||
|
//there is no way of recovering so abort
|
||||||
|
//cancel all partUploads
|
||||||
|
for (ListenableFuture<PartETag> future : partETagsFutures) {
|
||||||
|
future.cancel(true);
|
||||||
|
}
|
||||||
|
//abort multipartupload
|
||||||
|
this.abort();
|
||||||
|
throw new IOException("Part upload failed in multi-part upload with " +
|
||||||
|
"id '" +uploadId + "':" + ee, ee);
|
||||||
|
}
|
||||||
|
//should not happen?
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void complete(List<PartETag> partETags) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Completing multi-part upload for key '{}', id '{}'", key,
|
||||||
|
uploadId);
|
||||||
|
}
|
||||||
|
final CompleteMultipartUploadRequest completeRequest =
|
||||||
|
new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
|
||||||
|
client.completeMultipartUpload(completeRequest);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void abort() {
|
||||||
|
LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
|
||||||
|
try {
|
||||||
|
client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
|
||||||
|
key, uploadId));
|
||||||
|
} catch (Exception e2) {
|
||||||
|
LOG.warn("Unable to abort multipart upload, you may need to purge " +
|
||||||
|
"uploaded parts: " + e2, e2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ProgressableListener implements ProgressListener {
|
||||||
|
private final Progressable progress;
|
||||||
|
|
||||||
|
public ProgressableListener(Progressable progress) {
|
||||||
|
this.progress = progress;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void progressChanged(ProgressEvent progressEvent) {
|
||||||
|
if (progress != null) {
|
||||||
|
progress.progress();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -88,7 +88,8 @@ public class S3AFileSystem extends FileSystem {
|
||||||
private int maxKeys;
|
private int maxKeys;
|
||||||
private long partSize;
|
private long partSize;
|
||||||
private TransferManager transfers;
|
private TransferManager transfers;
|
||||||
private int partSizeThreshold;
|
private ThreadPoolExecutor threadPoolExecutor;
|
||||||
|
private int multiPartThreshold;
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
||||||
private CannedAccessControlList cannedACL;
|
private CannedAccessControlList cannedACL;
|
||||||
private String serverSideEncryptionAlgorithm;
|
private String serverSideEncryptionAlgorithm;
|
||||||
|
@ -237,7 +238,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
|
|
||||||
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
|
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
|
||||||
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
||||||
partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD,
|
multiPartThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD,
|
||||||
DEFAULT_MIN_MULTIPART_THRESHOLD);
|
DEFAULT_MIN_MULTIPART_THRESHOLD);
|
||||||
|
|
||||||
if (partSize < 5 * 1024 * 1024) {
|
if (partSize < 5 * 1024 * 1024) {
|
||||||
|
@ -245,9 +246,9 @@ public class S3AFileSystem extends FileSystem {
|
||||||
partSize = 5 * 1024 * 1024;
|
partSize = 5 * 1024 * 1024;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partSizeThreshold < 5 * 1024 * 1024) {
|
if (multiPartThreshold < 5 * 1024 * 1024) {
|
||||||
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
|
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
|
||||||
partSizeThreshold = 5 * 1024 * 1024;
|
multiPartThreshold = 5 * 1024 * 1024;
|
||||||
}
|
}
|
||||||
|
|
||||||
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
|
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
|
||||||
|
@ -262,20 +263,20 @@ public class S3AFileSystem extends FileSystem {
|
||||||
LinkedBlockingQueue<Runnable> workQueue =
|
LinkedBlockingQueue<Runnable> workQueue =
|
||||||
new LinkedBlockingQueue<>(maxThreads *
|
new LinkedBlockingQueue<>(maxThreads *
|
||||||
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
|
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
|
||||||
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
|
threadPoolExecutor = new ThreadPoolExecutor(
|
||||||
coreThreads,
|
coreThreads,
|
||||||
maxThreads,
|
maxThreads,
|
||||||
keepAliveTime,
|
keepAliveTime,
|
||||||
TimeUnit.SECONDS,
|
TimeUnit.SECONDS,
|
||||||
workQueue,
|
workQueue,
|
||||||
newDaemonThreadFactory("s3a-transfer-shared-"));
|
newDaemonThreadFactory("s3a-transfer-shared-"));
|
||||||
tpe.allowCoreThreadTimeOut(true);
|
threadPoolExecutor.allowCoreThreadTimeOut(true);
|
||||||
|
|
||||||
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
|
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
|
||||||
transferConfiguration.setMinimumUploadPartSize(partSize);
|
transferConfiguration.setMinimumUploadPartSize(partSize);
|
||||||
transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
|
transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
|
||||||
|
|
||||||
transfers = new TransferManager(s3, tpe);
|
transfers = new TransferManager(s3, threadPoolExecutor);
|
||||||
transfers.setConfiguration(transferConfiguration);
|
transfers.setConfiguration(transferConfiguration);
|
||||||
|
|
||||||
String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
|
String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
|
||||||
|
@ -391,7 +392,12 @@ public class S3AFileSystem extends FileSystem {
|
||||||
if (!overwrite && exists(f)) {
|
if (!overwrite && exists(f)) {
|
||||||
throw new FileAlreadyExistsException(f + " already exists");
|
throw new FileAlreadyExistsException(f + " already exists");
|
||||||
}
|
}
|
||||||
|
if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
|
||||||
|
return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
|
||||||
|
key, progress, statistics, cannedACL,
|
||||||
|
serverSideEncryptionAlgorithm, partSize, (long)multiPartThreshold,
|
||||||
|
threadPoolExecutor), statistics);
|
||||||
|
}
|
||||||
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
|
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
|
||||||
return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
|
return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
|
||||||
bucket, key, progress, cannedACL, statistics,
|
bucket, key, progress, cannedACL, statistics,
|
||||||
|
|
|
@ -213,13 +213,13 @@ If you do any of these: change your credentials immediately!
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.connection.establish.timeout</name>
|
<name>fs.s3a.connection.establish.timeout</name>
|
||||||
<value>5000</value>
|
<value>5000</value>
|
||||||
<description>Socket connection setup timeout in seconds.</description>
|
<description>Socket connection setup timeout in milliseconds.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.connection.timeout</name>
|
<name>fs.s3a.connection.timeout</name>
|
||||||
<value>50000</value>
|
<value>50000</value>
|
||||||
<description>Socket connection timeout in seconds.</description>
|
<description>Socket connection timeout in milliseconds.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
@ -292,7 +292,7 @@ If you do any of these: change your credentials immediately!
|
||||||
<name>fs.s3a.buffer.dir</name>
|
<name>fs.s3a.buffer.dir</name>
|
||||||
<value>${hadoop.tmp.dir}/s3a</value>
|
<value>${hadoop.tmp.dir}/s3a</value>
|
||||||
<description>Comma separated list of directories that will be used to buffer file
|
<description>Comma separated list of directories that will be used to buffer file
|
||||||
uploads to.</description>
|
uploads to. No effect if fs.s3a.fast.upload is true.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
@ -301,6 +301,40 @@ If you do any of these: change your credentials immediately!
|
||||||
<description>The implementation class of the S3A Filesystem</description>
|
<description>The implementation class of the S3A Filesystem</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
### S3AFastOutputStream
|
||||||
|
**Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk**
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.fast.upload</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>Upload directly from memory instead of buffering to
|
||||||
|
disk first. Memory usage and parallelism can be controlled as up to
|
||||||
|
fs.s3a.multipart.size memory is consumed for each (part)upload actively
|
||||||
|
uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.fast.buffer.size</name>
|
||||||
|
<value>1048576</value>
|
||||||
|
<description>Size (in bytes) of initial memory buffer allocated for an
|
||||||
|
upload. No effect if fs.s3a.fast.upload is false.</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
Writes are buffered in memory instead of to a file on local disk. This
|
||||||
|
removes the throughput bottleneck of the local disk write and read cycle
|
||||||
|
before starting the actual upload. Furthermore, it allows handling files that
|
||||||
|
are larger than the remaining local disk space.
|
||||||
|
|
||||||
|
However, non-trivial memory tuning is needed for optimal results and careless
|
||||||
|
settings could cause memory overflow. Up to `fs.s3a.threads.max` parallel
|
||||||
|
(part)uploads are active. Furthermore, up to `fs.s3a.max.total.tasks`
|
||||||
|
additional part(uploads) can be waiting (and thus memory buffers are created).
|
||||||
|
The memory buffer is uploaded as a single upload if it is not larger than
|
||||||
|
`fs.s3a.multipart.threshold`. Else, a multi-part upload is initiatated and
|
||||||
|
parts of size `fs.s3a.multipart.size` are used to protect against overflowing
|
||||||
|
the available memory. These settings should be tuned to the envisioned
|
||||||
|
workflow (some large files, many small ones, ...) and the physical
|
||||||
|
limitations of the machine and cluster (memory, network bandwidth).
|
||||||
|
|
||||||
## Testing the S3 filesystem clients
|
## Testing the S3 filesystem clients
|
||||||
|
|
||||||
|
@ -334,7 +368,7 @@ each filesystem for its testing.
|
||||||
The contents of each bucket will be destroyed during the test process:
|
The contents of each bucket will be destroyed during the test process:
|
||||||
do not use the bucket for any purpose other than testing. Furthermore, for
|
do not use the bucket for any purpose other than testing. Furthermore, for
|
||||||
s3a, all in-progress multi-part uploads to the bucket will be aborted at the
|
s3a, all in-progress multi-part uploads to the bucket will be aborted at the
|
||||||
start of a test (by forcing fs.s3a.multipart.purge=true) to clean up the
|
start of a test (by forcing `fs.s3a.multipart.purge=true`) to clean up the
|
||||||
temporary state of previously failed tests.
|
temporary state of previously failed tests.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
@ -392,14 +426,14 @@ Example:
|
||||||
## File `contract-test-options.xml`
|
## File `contract-test-options.xml`
|
||||||
|
|
||||||
The file `hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml`
|
The file `hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml`
|
||||||
must be created and configured for the test fileystems.
|
must be created and configured for the test filesystems.
|
||||||
|
|
||||||
If a specific file `fs.contract.test.fs.*` test path is not defined for
|
If a specific file `fs.contract.test.fs.*` test path is not defined for
|
||||||
any of the filesystems, those tests will be skipped.
|
any of the filesystems, those tests will be skipped.
|
||||||
|
|
||||||
The standard S3 authentication details must also be provided. This can be
|
The standard S3 authentication details must also be provided. This can be
|
||||||
through copy-and-paste of the `auth-keys.xml` credentials, or it can be
|
through copy-and-paste of the `auth-keys.xml` credentials, or it can be
|
||||||
through direct XInclude inclustion.
|
through direct XInclude inclusion.
|
||||||
|
|
||||||
#### s3://
|
#### s3://
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests regular and multi-part upload functionality for S3AFastOutputStream.
|
||||||
|
* File sizes are kept small to reduce test duration on slow connections
|
||||||
|
*/
|
||||||
|
public class TestS3AFastOutputStream {
|
||||||
|
private FileSystem fs;
|
||||||
|
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
|
||||||
|
conf.setInt(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
|
||||||
|
conf.setBoolean(Constants.FAST_UPLOAD, true);
|
||||||
|
fs = S3ATestUtils.createTestFileSystem(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (fs != null) {
|
||||||
|
fs.delete(getTestPath(), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Path getTestPath() {
|
||||||
|
return new Path("/tests3a");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRegularUpload() throws IOException {
|
||||||
|
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiPartUpload() throws IOException {
|
||||||
|
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 *
|
||||||
|
1024);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue