HADOOP-18637. S3A to support upload of files greater than 2 GB using DiskBlocks (#5543)

Contributed By: HarshitGupta and Steve Loughran
This commit is contained in:
Steve Loughran 2023-04-12 00:47:45 +01:00 committed by GitHub
parent bffa49a64f
commit 7c3d94a032
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 465 additions and 73 deletions

View File

@ -108,6 +108,7 @@
<configuration>
<forkCount>${testsThreadCount}</forkCount>
<reuseForks>false</reuseForks>
<trimStackTrace>false</trimStackTrace>
<argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
<systemPropertyVariables>
<testsThreadCount>${testsThreadCount}</testsThreadCount>
@ -272,6 +273,7 @@
<goal>verify</goal>
</goals>
<configuration>
<trimStackTrace>false</trimStackTrace>
<systemPropertyVariables>
<!-- Propagate scale parameters -->
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>

View File

@ -1255,4 +1255,25 @@ public final class Constants {
*/
public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count";
public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8;
/**
* Option to enable or disable the multipart uploads.
* Value: {@value}.
* <p>
* Default is {@link #DEFAULT_MULTIPART_UPLOAD_ENABLED}.
*/
public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled";
/**
* Default value for multipart uploads.
* {@value}
*/
public static final boolean DEFAULT_MULTIPART_UPLOAD_ENABLED = true;
/**
* Stream supports multipart uploads to the given path.
*/
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED =
"fs.s3a.capability.multipart.uploads.enabled";
}

View File

@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements
private final String key;
/** Size of all blocks. */
private final int blockSize;
private final long blockSize;
/** IO Statistics. */
private final IOStatistics iostatistics;
@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements
/** Thread level IOStatistics Aggregator. */
private final IOStatisticsAggregator threadIOStatisticsAggregator;
/** Is multipart upload enabled? */
private final boolean isMultipartUploadEnabled;
/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
@ -181,7 +184,6 @@ class S3ABlockOutputStream extends OutputStream implements
this.builder = builder;
this.key = builder.key;
this.blockFactory = builder.blockFactory;
this.blockSize = (int) builder.blockSize;
this.statistics = builder.statistics;
// test instantiations may not provide statistics;
this.iostatistics = statistics.getIOStatistics();
@ -195,17 +197,26 @@ class S3ABlockOutputStream extends OutputStream implements
(ProgressListener) progress
: new ProgressableListener(progress);
downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
// create that first block. This guarantees that an open + close sequence
// writes a 0-byte entry.
createBlockIfNeeded();
LOG.debug("Initialized S3ABlockOutputStream for {}" +
" output to {}", key, activeBlock);
// look for multipart support.
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
// block size is infinite if multipart is disabled, so ignore
// what was passed in from the builder.
this.blockSize = isMultipartUploadEnabled
? builder.blockSize
: -1;
if (putTracker.initialize()) {
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
// create that first block. This guarantees that an open + close sequence
// writes a 0-byte entry.
createBlockIfNeeded();
LOG.debug("Initialized S3ABlockOutputStream for {}" +
" output to {}", key, activeBlock);
}
/**
@ -318,7 +329,15 @@ class S3ABlockOutputStream extends OutputStream implements
statistics.writeBytes(len);
S3ADataBlocks.DataBlock block = createBlockIfNeeded();
int written = block.write(source, offset, len);
int remainingCapacity = block.remainingCapacity();
if (!isMultipartUploadEnabled) {
// no need to check for space as multipart uploads
// are not available...everything is saved to a single
// (disk) block.
return;
}
// look to see if another block is needed to complete
// the upload or exactly a block was written.
int remainingCapacity = (int) block.remainingCapacity();
if (written < len) {
// not everything was written the block has run out
// of capacity
@ -369,6 +388,8 @@ class S3ABlockOutputStream extends OutputStream implements
*/
@Retries.RetryTranslated
private void initMultipartUpload() throws IOException {
Preconditions.checkState(isMultipartUploadEnabled,
"multipart upload is disabled");
if (multiPartUpload == null) {
LOG.debug("Initiating Multipart upload");
multiPartUpload = new MultiPartUpload(key);
@ -558,19 +579,20 @@ class S3ABlockOutputStream extends OutputStream implements
}
/**
* Upload the current block as a single PUT request; if the buffer
* is empty a 0-byte PUT will be invoked, as it is needed to create an
* entry at the far end.
* @throws IOException any problem.
* @return number of bytes uploaded. If thread was interrupted while
* waiting for upload to complete, returns zero with interrupted flag set
* on this thread.
* Upload the current block as a single PUT request; if the buffer is empty a
* 0-byte PUT will be invoked, as it is needed to create an entry at the far
* end.
* @return number of bytes uploaded. If thread was interrupted while waiting
* for upload to complete, returns zero with interrupted flag set on this
* thread.
* @throws IOException
* any problem.
*/
private int putObject() throws IOException {
private long putObject() throws IOException {
LOG.debug("Executing regular upload for {}", writeOperationHelper);
final S3ADataBlocks.DataBlock block = getActiveBlock();
int size = block.dataSize();
long size = block.dataSize();
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
writeOperationHelper.createPutObjectRequest(
@ -617,6 +639,7 @@ class S3ABlockOutputStream extends OutputStream implements
"S3ABlockOutputStream{");
sb.append(writeOperationHelper.toString());
sb.append(", blockSize=").append(blockSize);
sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled);
// unsynced access; risks consistency in exchange for no risk of deadlock.
S3ADataBlocks.DataBlock block = activeBlock;
if (block != null) {
@ -835,7 +858,7 @@ class S3ABlockOutputStream extends OutputStream implements
Preconditions.checkNotNull(uploadId, "Null uploadId");
maybeRethrowUploadFailure();
partsSubmitted++;
final int size = block.dataSize();
final long size = block.dataSize();
bytesSubmitted += size;
final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request;
@ -1011,7 +1034,7 @@ class S3ABlockOutputStream extends OutputStream implements
ProgressEventType eventType = progressEvent.getEventType();
long bytesTransferred = progressEvent.getBytesTransferred();
int size = block.dataSize();
long size = block.dataSize();
switch (eventType) {
case REQUEST_BYTE_TRANSFER_EVENT:
@ -1126,6 +1149,11 @@ class S3ABlockOutputStream extends OutputStream implements
*/
private IOStatisticsAggregator ioStatisticsAggregator;
/**
* Is Multipart Uploads enabled for the given upload.
*/
private boolean isMultipartUploadEnabled;
private BlockOutputStreamBuilder() {
}
@ -1276,5 +1304,11 @@ class S3ABlockOutputStream extends OutputStream implements
ioStatisticsAggregator = value;
return this;
}
public BlockOutputStreamBuilder withMultipartEnabled(
final boolean value) {
isMultipartUploadEnabled = value;
return this;
}
}
}

View File

@ -180,7 +180,7 @@ final class S3ADataBlocks {
* @param statistics stats to work with
* @return a new block.
*/
abstract DataBlock create(long index, int limit,
abstract DataBlock create(long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException;
@ -258,7 +258,7 @@ final class S3ADataBlocks {
* Return the current data size.
* @return the size of the data
*/
abstract int dataSize();
abstract long dataSize();
/**
* Predicate to verify that the block has the capacity to write
@ -280,7 +280,7 @@ final class S3ADataBlocks {
* The remaining capacity in the block before it is full.
* @return the number of bytes remaining.
*/
abstract int remainingCapacity();
abstract long remainingCapacity();
/**
* Write a series of bytes from the buffer, from the offset.
@ -391,9 +391,11 @@ final class S3ADataBlocks {
}
@Override
DataBlock create(long index, int limit,
DataBlock create(long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
Preconditions.checkArgument(limit > 0,
"Invalid block size: %d", limit);
return new ByteArrayBlock(0, limit, statistics);
}
@ -436,11 +438,11 @@ final class S3ADataBlocks {
private Integer dataSize;
ByteArrayBlock(long index,
int limit,
long limit,
BlockOutputStreamStatistics statistics) {
super(index, statistics);
this.limit = limit;
buffer = new S3AByteArrayOutputStream(limit);
this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit;
buffer = new S3AByteArrayOutputStream(this.limit);
blockAllocated();
}
@ -449,7 +451,7 @@ final class S3ADataBlocks {
* @return the amount of data available to upload.
*/
@Override
int dataSize() {
long dataSize() {
return dataSize != null ? dataSize : buffer.size();
}
@ -468,14 +470,14 @@ final class S3ADataBlocks {
}
@Override
int remainingCapacity() {
long remainingCapacity() {
return limit - dataSize();
}
@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
int written = (int) Math.min(remainingCapacity(), len);
buffer.write(b, offset, written);
return written;
}
@ -514,9 +516,11 @@ final class S3ADataBlocks {
}
@Override
ByteBufferBlock create(long index, int limit,
ByteBufferBlock create(long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
Preconditions.checkArgument(limit > 0,
"Invalid block size: %d", limit);
return new ByteBufferBlock(index, limit, statistics);
}
@ -564,11 +568,12 @@ final class S3ADataBlocks {
* @param statistics statistics to update
*/
ByteBufferBlock(long index,
int bufferSize,
long bufferSize,
BlockOutputStreamStatistics statistics) {
super(index, statistics);
this.bufferSize = bufferSize;
blockBuffer = requestBuffer(bufferSize);
this.bufferSize = bufferSize > Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) bufferSize;
blockBuffer = requestBuffer(this.bufferSize);
blockAllocated();
}
@ -577,7 +582,7 @@ final class S3ADataBlocks {
* @return the amount of data available to upload.
*/
@Override
int dataSize() {
long dataSize() {
return dataSize != null ? dataSize : bufferCapacityUsed();
}
@ -598,7 +603,7 @@ final class S3ADataBlocks {
}
@Override
public int remainingCapacity() {
public long remainingCapacity() {
return blockBuffer != null ? blockBuffer.remaining() : 0;
}
@ -609,7 +614,7 @@ final class S3ADataBlocks {
@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
int written = (int) Math.min(remainingCapacity(), len);
blockBuffer.put(b, offset, written);
return written;
}
@ -802,16 +807,18 @@ final class S3ADataBlocks {
* Create a temp file and a {@link DiskBlock} instance to manage it.
*
* @param index block index
* @param limit limit of the block.
* @param limit limit of the block. -1 means "no limit"
* @param statistics statistics to update
* @return the new block
* @throws IOException IO problems
*/
@Override
DataBlock create(long index,
int limit,
long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
Preconditions.checkArgument(limit != 0,
"Invalid block size: %d", limit);
File destFile = getOwner()
.createTmpFileForWrite(String.format("s3ablock-%04d-", index),
limit, getOwner().getConf());
@ -825,14 +832,14 @@ final class S3ADataBlocks {
*/
static class DiskBlock extends DataBlock {
private int bytesWritten;
private long bytesWritten;
private final File bufferFile;
private final int limit;
private final long limit;
private BufferedOutputStream out;
private final AtomicBoolean closed = new AtomicBoolean(false);
DiskBlock(File bufferFile,
int limit,
long limit,
long index,
BlockOutputStreamStatistics statistics)
throws FileNotFoundException {
@ -844,24 +851,39 @@ final class S3ADataBlocks {
}
@Override
int dataSize() {
long dataSize() {
return bytesWritten;
}
/**
* Does this block have unlimited space?
* @return true if a block with no size limit was created.
*/
private boolean unlimited() {
return limit < 0;
}
@Override
boolean hasCapacity(long bytes) {
return dataSize() + bytes <= limit;
return unlimited() || dataSize() + bytes <= limit;
}
/**
* {@inheritDoc}.
* If there is no limit to capacity, return MAX_VALUE.
* @return capacity in the block.
*/
@Override
int remainingCapacity() {
return limit - bytesWritten;
long remainingCapacity() {
return unlimited()
? Integer.MAX_VALUE
: limit - bytesWritten;
}
@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
int written = (int) Math.min(remainingCapacity(), len);
out.write(b, offset, written);
bytesWritten += written;
return written;

View File

@ -413,6 +413,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private ArnResource accessPoint;
/**
* Does this S3A FS instance have multipart upload enabled?
*/
private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED;
/**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
@ -543,7 +548,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
this.prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
initThreadPools(conf);
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
@ -605,7 +611,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
DEFAULT_FAST_UPLOAD_BUFFER);
partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
blockOutputActiveBlocks = intOption(conf,
FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
@ -614,8 +619,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
blockOutputActiveBlocks = 1;
}
LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
" queue limit={}",
blockOutputBuffer, partSize, blockOutputActiveBlocks);
" queue limit={}; multipart={}",
blockOutputBuffer, partSize, blockOutputActiveBlocks, isMultipartUploadEnabled);
// verify there's no S3Guard in the store config.
checkNoS3Guard(this.getUri(), getConf());
@ -1092,6 +1097,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
.withRequestPreparer(getAuditManager()::requestCreated)
.withContentEncoding(contentEncoding)
.withStorageClass(storageClass)
.withMultipartUploadEnabled(isMultipartUploadEnabled)
.build();
}
@ -1842,6 +1848,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
final PutObjectOptions putOptions =
new PutObjectOptions(keep, null, options.getHeaders());
validateOutputStreamConfiguration(path, getConf());
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
S3ABlockOutputStream.builder()
.withKey(destKey)
@ -1865,7 +1873,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
.withCSEEnabled(isCSEEnabled)
.withPutOptions(putOptions)
.withIOStatisticsAggregator(
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator());
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
.withMultipartEnabled(isMultipartUploadEnabled);
return new FSDataOutputStream(
new S3ABlockOutputStream(builder),
null);
@ -5103,6 +5112,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE:
return !keepDirectoryMarkers(path);
case STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED:
return isMultipartUploadEnabled();
// create file options
case FS_S3A_CREATE_PERFORMANCE:
case FS_S3A_CREATE_HEADER:
@ -5419,4 +5431,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
public boolean isCSEEnabled() {
return isCSEEnabled;
}
public boolean isMultipartUploadEnabled() {
return isMultipartUploadEnabled;
}
}

View File

@ -1547,7 +1547,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
* of block uploads pending (1) and the bytes pending (blockSize).
*/
@Override
public void blockUploadQueued(int blockSize) {
public void blockUploadQueued(long blockSize) {
incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS);
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1);
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize);
@ -1560,7 +1560,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
* {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}.
*/
@Override
public void blockUploadStarted(Duration timeInQueue, int blockSize) {
public void blockUploadStarted(Duration timeInQueue, long blockSize) {
// the local counter is used in toString reporting.
queueDuration.addAndGet(timeInQueue.toMillis());
// update the duration fields in the IOStatistics.
@ -1588,7 +1588,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
@Override
public void blockUploadCompleted(
Duration timeSinceUploadStarted,
int blockSize) {
long blockSize) {
transferDuration.addAndGet(timeSinceUploadStarted.toMillis());
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1);
blockUploadsCompleted.incrementAndGet();
@ -1602,7 +1602,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
@Override
public void blockUploadFailed(
Duration timeSinceUploadStarted,
int blockSize) {
long blockSize) {
incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
@ -1031,6 +1032,38 @@ public final class S3AUtils {
return partSize;
}
/**
* Validates the output stream configuration.
* @param path path: for error messages
* @param conf : configuration object for the given context
* @throws PathIOException Unsupported configuration.
*/
public static void validateOutputStreamConfiguration(final Path path,
Configuration conf) throws PathIOException {
if(!checkDiskBuffer(conf)){
throw new PathIOException(path.toString(),
"Unable to create OutputStream with the given"
+ " multipart upload and buffer configuration.");
}
}
/**
* Check whether the configuration for S3ABlockOutputStream is
* consistent or not. Multipart uploads allow all kinds of fast buffers to
* be supported. When the option is disabled only disk buffers are allowed to
* be used as the file size might be bigger than the buffer size that can be
* allocated.
* @param conf : configuration object for the given context
* @return true if the disk buffer and the multipart settings are supported
*/
public static boolean checkDiskBuffer(Configuration conf) {
boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
return isMultipartUploadEnabled
|| FAST_UPLOAD_BUFFER_DISK.equals(
conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER));
}
/**
* Ensure that the long value is in the range of an integer.
* @param name property name for error messages

View File

@ -269,8 +269,6 @@ public class WriteOperationHelper implements WriteOperations {
String dest,
File sourceFile,
final PutObjectOptions options) {
Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE,
"File length is too big for a single PUT upload");
activateAuditSpan();
final ObjectMetadata objectMetadata =
newObjectMetadata((int) sourceFile.length());
@ -532,7 +530,7 @@ public class WriteOperationHelper implements WriteOperations {
String destKey,
String uploadId,
int partNumber,
int size,
long size,
InputStream uploadStream,
File sourceFile,
Long offset) throws IOException {

View File

@ -233,7 +233,7 @@ public interface WriteOperations extends AuditSpanSource, Closeable {
String destKey,
String uploadId,
int partNumber,
int size,
long size,
InputStream uploadStream,
File sourceFile,
Long offset) throws IOException;

View File

@ -196,10 +196,11 @@ public interface RequestFactory {
* @param destKey destination object key
* @param options options for the request
* @return the request.
* @throws PathIOException if multipart uploads are disabled
*/
InitiateMultipartUploadRequest newMultipartUploadRequest(
String destKey,
@Nullable PutObjectOptions options);
@Nullable PutObjectOptions options) throws PathIOException;
/**
* Complete a multipart upload.
@ -248,7 +249,7 @@ public interface RequestFactory {
String destKey,
String uploadId,
int partNumber,
int size,
long size,
InputStream uploadStream,
File sourceFile,
long offset) throws PathIOException;

View File

@ -217,6 +217,10 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
role, jobName(context), jobIdString(context), outputPath);
S3AFileSystem fs = getDestS3AFS();
if (!fs.isMultipartUploadEnabled()) {
throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem,"
+ " the committer can't proceed.");
}
// set this thread's context with the job ID.
// audit spans created in this thread will pick
// up this value., including the commit operations instance

View File

@ -124,6 +124,11 @@ public class RequestFactoryImpl implements RequestFactory {
*/
private final StorageClass storageClass;
/**
* Is multipart upload enabled.
*/
private final boolean isMultipartUploadEnabled;
/**
* Constructor.
* @param builder builder with all the configuration.
@ -137,6 +142,7 @@ public class RequestFactoryImpl implements RequestFactory {
this.requestPreparer = builder.requestPreparer;
this.contentEncoding = builder.contentEncoding;
this.storageClass = builder.storageClass;
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
}
/**
@ -460,7 +466,10 @@ public class RequestFactoryImpl implements RequestFactory {
@Override
public InitiateMultipartUploadRequest newMultipartUploadRequest(
final String destKey,
@Nullable final PutObjectOptions options) {
@Nullable final PutObjectOptions options) throws PathIOException {
if (!isMultipartUploadEnabled) {
throw new PathIOException(destKey, "Multipart uploads are disabled.");
}
final ObjectMetadata objectMetadata = newObjectMetadata(-1);
maybeSetMetadata(options, objectMetadata);
final InitiateMultipartUploadRequest initiateMPURequest =
@ -509,7 +518,7 @@ public class RequestFactoryImpl implements RequestFactory {
String destKey,
String uploadId,
int partNumber,
int size,
long size,
InputStream uploadStream,
File sourceFile,
long offset) throws PathIOException {
@ -682,6 +691,11 @@ public class RequestFactoryImpl implements RequestFactory {
*/
private PrepareRequest requestPreparer;
/**
* Is Multipart Enabled on the path.
*/
private boolean isMultipartUploadEnabled = true;
private RequestFactoryBuilder() {
}
@ -767,6 +781,18 @@ public class RequestFactoryImpl implements RequestFactory {
this.requestPreparer = value;
return this;
}
/**
* Multipart upload enabled.
*
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withMultipartUploadEnabled(
final boolean value) {
this.isMultipartUploadEnabled = value;
return this;
}
}
/**

View File

@ -32,21 +32,21 @@ public interface BlockOutputStreamStatistics extends Closeable,
* Block is queued for upload.
* @param blockSize block size.
*/
void blockUploadQueued(int blockSize);
void blockUploadQueued(long blockSize);
/**
* Queued block has been scheduled for upload.
* @param timeInQueue time in the queue.
* @param blockSize block size.
*/
void blockUploadStarted(Duration timeInQueue, int blockSize);
void blockUploadStarted(Duration timeInQueue, long blockSize);
/**
* A block upload has completed. Duration excludes time in the queue.
* @param timeSinceUploadStarted time in since the transfer began.
* @param blockSize block size
*/
void blockUploadCompleted(Duration timeSinceUploadStarted, int blockSize);
void blockUploadCompleted(Duration timeSinceUploadStarted, long blockSize);
/**
* A block upload has failed. Duration excludes time in the queue.
@ -57,7 +57,7 @@ public interface BlockOutputStreamStatistics extends Closeable,
* @param timeSinceUploadStarted time in since the transfer began.
* @param blockSize block size
*/
void blockUploadFailed(Duration timeSinceUploadStarted, int blockSize);
void blockUploadFailed(Duration timeSinceUploadStarted, long blockSize);
/**
* Intermediate report of bytes uploaded.

View File

@ -442,22 +442,22 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
implements BlockOutputStreamStatistics {
@Override
public void blockUploadQueued(final int blockSize) {
public void blockUploadQueued(final long blockSize) {
}
@Override
public void blockUploadStarted(final Duration timeInQueue,
final int blockSize) {
final long blockSize) {
}
@Override
public void blockUploadCompleted(final Duration timeSinceUploadStarted,
final int blockSize) {
final long blockSize) {
}
@Override
public void blockUploadFailed(final Duration timeSinceUploadStarted,
final int blockSize) {
final long blockSize) {
}
@Override

View File

@ -1727,7 +1727,9 @@ The "fast" output stream
1. Uploads large files as blocks with the size set by
`fs.s3a.multipart.size`. That is: the threshold at which multipart uploads
begin and the size of each upload are identical.
begin and the size of each upload are identical. This behavior can be enabled
or disabled by using the flag `fs.s3a.multipart.uploads.enabled` which by
default is set to true.
1. Buffers blocks to disk (default) or in on-heap or off-heap memory.
1. Uploads blocks in parallel in background threads.
1. Begins uploading blocks as soon as the buffered data exceeds this partition

View File

@ -200,6 +200,11 @@ public class MockS3AFileSystem extends S3AFileSystem {
return true;
}
@Override
public boolean isMultipartUploadEnabled() {
return true;
}
/**
* Make operation to set the s3 client public.
* @param client client.

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.magic;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Verify that the magic committer cannot be created if the FS doesn't support multipart
* uploads.
*/
public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBucketOverrides(getTestBucketName(conf), conf,
MAGIC_COMMITTER_ENABLED,
S3A_COMMITTER_FACTORY_KEY,
FS_S3A_COMMITTER_NAME,
MULTIPART_UPLOADS_ENABLED);
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
conf.set(FS_S3A_COMMITTER_NAME, CommitConstants.COMMITTER_NAME_MAGIC);
return conf;
}
@Test
public void testCreateCommitter() throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
new TaskAttemptID());
Path commitPath = methodPath();
LOG.debug("Trying to create a committer on the path: {}", commitPath);
intercept(PathCommitException.class,
() -> new MagicS3GuardCommitter(commitPath, tContext));
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Verify that a staging committer cannot be created if the FS doesn't support multipart
* uploads.
*/
public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBucketOverrides(getTestBucketName(conf), conf,
S3A_COMMITTER_FACTORY_KEY,
FS_S3A_COMMITTER_NAME,
MULTIPART_UPLOADS_ENABLED);
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING);
return conf;
}
@Test
public void testCreateCommitter() throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
new TaskAttemptID());
Path commitPath = methodPath();
LOG.debug("Trying to create a committer on the path: {}", commitPath);
intercept(PathCommitException.class,
() -> new StagingCommitter(commitPath, tContext));
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.impl;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
@ -155,7 +156,7 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
* Create objects through the factory.
* @param factory factory
*/
private void createFactoryObjects(RequestFactory factory) {
private void createFactoryObjects(RequestFactory factory) throws IOException {
String path = "path";
String path2 = "path2";
String id = "1";

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants;
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
/**
* Test a file upload using a single PUT operation. Multipart uploads will
* be disabled in the test.
*/
public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase {
public static final Logger LOG = LoggerFactory.getLogger(
ITestS3AHugeFileUploadSinglePut.class);
private long fileSize;
@Override
protected Configuration createScaleConfiguration() {
Configuration conf = super.createScaleConfiguration();
removeBucketOverrides(getTestBucketName(conf), conf,
FAST_UPLOAD_BUFFER,
IO_CHUNK_BUFFER_SIZE,
KEY_HUGE_FILESIZE,
MULTIPART_UPLOADS_ENABLED,
MULTIPART_SIZE,
REQUEST_TIMEOUT);
conf.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false);
fileSize = getTestPropertyBytes(conf, KEY_HUGE_FILESIZE,
DEFAULT_HUGE_FILESIZE);
// set a small part size to verify it does not impact block allocation size
conf.setLong(MULTIPART_SIZE, 10_000);
conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK);
conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
conf.set(REQUEST_TIMEOUT, "1h");
return conf;
}
@Test
public void uploadFileSinglePut() throws IOException {
LOG.info("Creating file with size : {}", fileSize);
S3AFileSystem fs = getFileSystem();
ContractTestUtils.createAndVerifyFile(fs,
methodPath(), fileSize);
// Exactly three put requests should be made during the upload of the file
// First one being the creation of the directory marker
// Second being the creation of the test file
// Third being the creation of directory marker on the file delete
assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol())
.isEqualTo(3);
}
}