From 7c3d94a032ba0bfafb2d1ff35d4675cb6b5618d9 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 12 Apr 2023 00:47:45 +0100 Subject: [PATCH] HADOOP-18637. S3A to support upload of files greater than 2 GB using DiskBlocks (#5543) Contributed By: HarshitGupta and Steve Loughran --- hadoop-tools/hadoop-aws/pom.xml | 2 + .../org/apache/hadoop/fs/s3a/Constants.java | 21 +++++ .../hadoop/fs/s3a/S3ABlockOutputStream.java | 72 +++++++++++---- .../apache/hadoop/fs/s3a/S3ADataBlocks.java | 76 ++++++++++------ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 26 ++++-- .../hadoop/fs/s3a/S3AInstrumentation.java | 8 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 33 +++++++ .../hadoop/fs/s3a/WriteOperationHelper.java | 4 +- .../apache/hadoop/fs/s3a/WriteOperations.java | 2 +- .../hadoop/fs/s3a/api/RequestFactory.java | 5 +- .../fs/s3a/commit/AbstractS3ACommitter.java | 4 + .../fs/s3a/impl/RequestFactoryImpl.java | 30 ++++++- .../BlockOutputStreamStatistics.java | 8 +- .../impl/EmptyS3AStatisticsContext.java | 8 +- .../site/markdown/tools/hadoop-aws/index.md | 4 +- .../hadoop/fs/s3a/MockS3AFileSystem.java | 5 ++ .../ITestMagicCommitProtocolFailure.java | 69 ++++++++++++++ .../ITestStagingCommitProtocolFailure.java | 69 ++++++++++++++ .../fs/s3a/impl/TestRequestFactory.java | 3 +- .../ITestS3AHugeFileUploadSinglePut.java | 89 +++++++++++++++++++ 20 files changed, 465 insertions(+), 73 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 6ebf1c71f0d..ae8db93329d 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -108,6 +108,7 @@ ${testsThreadCount} false + false ${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true ${testsThreadCount} @@ -272,6 +273,7 @@ verify + false ${fs.s3a.scale.test.enabled} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 16472a75fd2..a59a07c8437 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1255,4 +1255,25 @@ public final class Constants { */ public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count"; public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8; + + /** + * Option to enable or disable the multipart uploads. + * Value: {@value}. + *

+ * Default is {@link #DEFAULT_MULTIPART_UPLOAD_ENABLED}. + */ + public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled"; + + /** + * Default value for multipart uploads. + * {@value} + */ + public static final boolean DEFAULT_MULTIPART_UPLOAD_ENABLED = true; + + /** + * Stream supports multipart uploads to the given path. + */ + public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED = + "fs.s3a.capability.multipart.uploads.enabled"; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 19943ff2f70..df3c9315ba8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements private final String key; /** Size of all blocks. */ - private final int blockSize; + private final long blockSize; /** IO Statistics. */ private final IOStatistics iostatistics; @@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements /** Thread level IOStatistics Aggregator. */ private final IOStatisticsAggregator threadIOStatisticsAggregator; + /** Is multipart upload enabled? */ + private final boolean isMultipartUploadEnabled; + /** * An S3A output stream which uploads partitions in a separate pool of * threads; different {@link S3ADataBlocks.BlockFactory} @@ -181,7 +184,6 @@ class S3ABlockOutputStream extends OutputStream implements this.builder = builder; this.key = builder.key; this.blockFactory = builder.blockFactory; - this.blockSize = (int) builder.blockSize; this.statistics = builder.statistics; // test instantiations may not provide statistics; this.iostatistics = statistics.getIOStatistics(); @@ -195,17 +197,26 @@ class S3ABlockOutputStream extends OutputStream implements (ProgressListener) progress : new ProgressableListener(progress); downgradeSyncableExceptions = builder.downgradeSyncableExceptions; - // create that first block. This guarantees that an open + close sequence - // writes a 0-byte entry. - createBlockIfNeeded(); - LOG.debug("Initialized S3ABlockOutputStream for {}" + - " output to {}", key, activeBlock); + + // look for multipart support. + this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; + // block size is infinite if multipart is disabled, so ignore + // what was passed in from the builder. + this.blockSize = isMultipartUploadEnabled + ? builder.blockSize + : -1; + if (putTracker.initialize()) { LOG.debug("Put tracker requests multipart upload"); initMultipartUpload(); } this.isCSEEnabled = builder.isCSEEnabled; this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator; + // create that first block. This guarantees that an open + close sequence + // writes a 0-byte entry. + createBlockIfNeeded(); + LOG.debug("Initialized S3ABlockOutputStream for {}" + + " output to {}", key, activeBlock); } /** @@ -318,7 +329,15 @@ class S3ABlockOutputStream extends OutputStream implements statistics.writeBytes(len); S3ADataBlocks.DataBlock block = createBlockIfNeeded(); int written = block.write(source, offset, len); - int remainingCapacity = block.remainingCapacity(); + if (!isMultipartUploadEnabled) { + // no need to check for space as multipart uploads + // are not available...everything is saved to a single + // (disk) block. + return; + } + // look to see if another block is needed to complete + // the upload or exactly a block was written. + int remainingCapacity = (int) block.remainingCapacity(); if (written < len) { // not everything was written —the block has run out // of capacity @@ -369,6 +388,8 @@ class S3ABlockOutputStream extends OutputStream implements */ @Retries.RetryTranslated private void initMultipartUpload() throws IOException { + Preconditions.checkState(isMultipartUploadEnabled, + "multipart upload is disabled"); if (multiPartUpload == null) { LOG.debug("Initiating Multipart upload"); multiPartUpload = new MultiPartUpload(key); @@ -558,19 +579,20 @@ class S3ABlockOutputStream extends OutputStream implements } /** - * Upload the current block as a single PUT request; if the buffer - * is empty a 0-byte PUT will be invoked, as it is needed to create an - * entry at the far end. - * @throws IOException any problem. - * @return number of bytes uploaded. If thread was interrupted while - * waiting for upload to complete, returns zero with interrupted flag set - * on this thread. + * Upload the current block as a single PUT request; if the buffer is empty a + * 0-byte PUT will be invoked, as it is needed to create an entry at the far + * end. + * @return number of bytes uploaded. If thread was interrupted while waiting + * for upload to complete, returns zero with interrupted flag set on this + * thread. + * @throws IOException + * any problem. */ - private int putObject() throws IOException { + private long putObject() throws IOException { LOG.debug("Executing regular upload for {}", writeOperationHelper); final S3ADataBlocks.DataBlock block = getActiveBlock(); - int size = block.dataSize(); + long size = block.dataSize(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); final PutObjectRequest putObjectRequest = uploadData.hasFile() ? writeOperationHelper.createPutObjectRequest( @@ -617,6 +639,7 @@ class S3ABlockOutputStream extends OutputStream implements "S3ABlockOutputStream{"); sb.append(writeOperationHelper.toString()); sb.append(", blockSize=").append(blockSize); + sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled); // unsynced access; risks consistency in exchange for no risk of deadlock. S3ADataBlocks.DataBlock block = activeBlock; if (block != null) { @@ -835,7 +858,7 @@ class S3ABlockOutputStream extends OutputStream implements Preconditions.checkNotNull(uploadId, "Null uploadId"); maybeRethrowUploadFailure(); partsSubmitted++; - final int size = block.dataSize(); + final long size = block.dataSize(); bytesSubmitted += size; final int currentPartNumber = partETagsFutures.size() + 1; final UploadPartRequest request; @@ -1011,7 +1034,7 @@ class S3ABlockOutputStream extends OutputStream implements ProgressEventType eventType = progressEvent.getEventType(); long bytesTransferred = progressEvent.getBytesTransferred(); - int size = block.dataSize(); + long size = block.dataSize(); switch (eventType) { case REQUEST_BYTE_TRANSFER_EVENT: @@ -1126,6 +1149,11 @@ class S3ABlockOutputStream extends OutputStream implements */ private IOStatisticsAggregator ioStatisticsAggregator; + /** + * Is Multipart Uploads enabled for the given upload. + */ + private boolean isMultipartUploadEnabled; + private BlockOutputStreamBuilder() { } @@ -1276,5 +1304,11 @@ class S3ABlockOutputStream extends OutputStream implements ioStatisticsAggregator = value; return this; } + + public BlockOutputStreamBuilder withMultipartEnabled( + final boolean value) { + isMultipartUploadEnabled = value; + return this; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index 03b5bd96162..b20d8e859aa 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -180,7 +180,7 @@ final class S3ADataBlocks { * @param statistics stats to work with * @return a new block. */ - abstract DataBlock create(long index, int limit, + abstract DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException; @@ -258,7 +258,7 @@ final class S3ADataBlocks { * Return the current data size. * @return the size of the data */ - abstract int dataSize(); + abstract long dataSize(); /** * Predicate to verify that the block has the capacity to write @@ -280,7 +280,7 @@ final class S3ADataBlocks { * The remaining capacity in the block before it is full. * @return the number of bytes remaining. */ - abstract int remainingCapacity(); + abstract long remainingCapacity(); /** * Write a series of bytes from the buffer, from the offset. @@ -391,9 +391,11 @@ final class S3ADataBlocks { } @Override - DataBlock create(long index, int limit, + DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { + Preconditions.checkArgument(limit > 0, + "Invalid block size: %d", limit); return new ByteArrayBlock(0, limit, statistics); } @@ -436,11 +438,11 @@ final class S3ADataBlocks { private Integer dataSize; ByteArrayBlock(long index, - int limit, + long limit, BlockOutputStreamStatistics statistics) { super(index, statistics); - this.limit = limit; - buffer = new S3AByteArrayOutputStream(limit); + this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit; + buffer = new S3AByteArrayOutputStream(this.limit); blockAllocated(); } @@ -449,7 +451,7 @@ final class S3ADataBlocks { * @return the amount of data available to upload. */ @Override - int dataSize() { + long dataSize() { return dataSize != null ? dataSize : buffer.size(); } @@ -468,14 +470,14 @@ final class S3ADataBlocks { } @Override - int remainingCapacity() { + long remainingCapacity() { return limit - dataSize(); } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); buffer.write(b, offset, written); return written; } @@ -514,9 +516,11 @@ final class S3ADataBlocks { } @Override - ByteBufferBlock create(long index, int limit, + ByteBufferBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { + Preconditions.checkArgument(limit > 0, + "Invalid block size: %d", limit); return new ByteBufferBlock(index, limit, statistics); } @@ -564,11 +568,12 @@ final class S3ADataBlocks { * @param statistics statistics to update */ ByteBufferBlock(long index, - int bufferSize, + long bufferSize, BlockOutputStreamStatistics statistics) { super(index, statistics); - this.bufferSize = bufferSize; - blockBuffer = requestBuffer(bufferSize); + this.bufferSize = bufferSize > Integer.MAX_VALUE ? + Integer.MAX_VALUE : (int) bufferSize; + blockBuffer = requestBuffer(this.bufferSize); blockAllocated(); } @@ -577,7 +582,7 @@ final class S3ADataBlocks { * @return the amount of data available to upload. */ @Override - int dataSize() { + long dataSize() { return dataSize != null ? dataSize : bufferCapacityUsed(); } @@ -598,7 +603,7 @@ final class S3ADataBlocks { } @Override - public int remainingCapacity() { + public long remainingCapacity() { return blockBuffer != null ? blockBuffer.remaining() : 0; } @@ -609,7 +614,7 @@ final class S3ADataBlocks { @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); blockBuffer.put(b, offset, written); return written; } @@ -802,16 +807,18 @@ final class S3ADataBlocks { * Create a temp file and a {@link DiskBlock} instance to manage it. * * @param index block index - * @param limit limit of the block. + * @param limit limit of the block. -1 means "no limit" * @param statistics statistics to update * @return the new block * @throws IOException IO problems */ @Override DataBlock create(long index, - int limit, + long limit, BlockOutputStreamStatistics statistics) throws IOException { + Preconditions.checkArgument(limit != 0, + "Invalid block size: %d", limit); File destFile = getOwner() .createTmpFileForWrite(String.format("s3ablock-%04d-", index), limit, getOwner().getConf()); @@ -825,14 +832,14 @@ final class S3ADataBlocks { */ static class DiskBlock extends DataBlock { - private int bytesWritten; + private long bytesWritten; private final File bufferFile; - private final int limit; + private final long limit; private BufferedOutputStream out; private final AtomicBoolean closed = new AtomicBoolean(false); DiskBlock(File bufferFile, - int limit, + long limit, long index, BlockOutputStreamStatistics statistics) throws FileNotFoundException { @@ -844,24 +851,39 @@ final class S3ADataBlocks { } @Override - int dataSize() { + long dataSize() { return bytesWritten; } + /** + * Does this block have unlimited space? + * @return true if a block with no size limit was created. + */ + private boolean unlimited() { + return limit < 0; + } + @Override boolean hasCapacity(long bytes) { - return dataSize() + bytes <= limit; + return unlimited() || dataSize() + bytes <= limit; } + /** + * {@inheritDoc}. + * If there is no limit to capacity, return MAX_VALUE. + * @return capacity in the block. + */ @Override - int remainingCapacity() { - return limit - bytesWritten; + long remainingCapacity() { + return unlimited() + ? Integer.MAX_VALUE + : limit - bytesWritten; } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); out.write(b, offset, written); bytesWritten += written; return written; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index e96feb0243a..a73bd55b55e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -413,6 +413,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private ArnResource accessPoint; + /** + * Does this S3A FS instance have multipart upload enabled? + */ + private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED; + /** * A cache of files that should be deleted when the FileSystem is closed * or the JVM is exited. @@ -543,7 +548,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, this.prefetchBlockSize = (int) prefetchBlockSizeLong; this.prefetchBlockCount = intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1); - + this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + DEFAULT_MULTIPART_UPLOAD_ENABLED); initThreadPools(conf); int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION); @@ -605,7 +611,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, } blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER); - partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer); blockOutputActiveBlocks = intOption(conf, FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1); @@ -614,8 +619,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, blockOutputActiveBlocks = 1; } LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" + - " queue limit={}", - blockOutputBuffer, partSize, blockOutputActiveBlocks); + " queue limit={}; multipart={}", + blockOutputBuffer, partSize, blockOutputActiveBlocks, isMultipartUploadEnabled); // verify there's no S3Guard in the store config. checkNoS3Guard(this.getUri(), getConf()); @@ -1092,6 +1097,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, .withRequestPreparer(getAuditManager()::requestCreated) .withContentEncoding(contentEncoding) .withStorageClass(storageClass) + .withMultipartUploadEnabled(isMultipartUploadEnabled) .build(); } @@ -1842,6 +1848,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, final PutObjectOptions putOptions = new PutObjectOptions(keep, null, options.getHeaders()); + validateOutputStreamConfiguration(path, getConf()); + final S3ABlockOutputStream.BlockOutputStreamBuilder builder = S3ABlockOutputStream.builder() .withKey(destKey) @@ -1865,7 +1873,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, .withCSEEnabled(isCSEEnabled) .withPutOptions(putOptions) .withIOStatisticsAggregator( - IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()); + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) + .withMultipartEnabled(isMultipartUploadEnabled); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); @@ -5103,6 +5112,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE: return !keepDirectoryMarkers(path); + case STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED: + return isMultipartUploadEnabled(); + // create file options case FS_S3A_CREATE_PERFORMANCE: case FS_S3A_CREATE_HEADER: @@ -5419,4 +5431,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, public boolean isCSEEnabled() { return isCSEEnabled; } + + public boolean isMultipartUploadEnabled() { + return isMultipartUploadEnabled; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 9d33efa9d01..da12223570e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -1547,7 +1547,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource, * of block uploads pending (1) and the bytes pending (blockSize). */ @Override - public void blockUploadQueued(int blockSize) { + public void blockUploadQueued(long blockSize) { incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize); @@ -1560,7 +1560,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource, * {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}. */ @Override - public void blockUploadStarted(Duration timeInQueue, int blockSize) { + public void blockUploadStarted(Duration timeInQueue, long blockSize) { // the local counter is used in toString reporting. queueDuration.addAndGet(timeInQueue.toMillis()); // update the duration fields in the IOStatistics. @@ -1588,7 +1588,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource, @Override public void blockUploadCompleted( Duration timeSinceUploadStarted, - int blockSize) { + long blockSize) { transferDuration.addAndGet(timeSinceUploadStarted.toMillis()); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1); blockUploadsCompleted.incrementAndGet(); @@ -1602,7 +1602,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource, @Override public void blockUploadFailed( Duration timeSinceUploadStarted, - int blockSize) { + long blockSize) { incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 8a1947f3e42..274bc96fb99 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; @@ -1031,6 +1032,38 @@ public final class S3AUtils { return partSize; } + /** + * Validates the output stream configuration. + * @param path path: for error messages + * @param conf : configuration object for the given context + * @throws PathIOException Unsupported configuration. + */ + public static void validateOutputStreamConfiguration(final Path path, + Configuration conf) throws PathIOException { + if(!checkDiskBuffer(conf)){ + throw new PathIOException(path.toString(), + "Unable to create OutputStream with the given" + + " multipart upload and buffer configuration."); + } + } + + /** + * Check whether the configuration for S3ABlockOutputStream is + * consistent or not. Multipart uploads allow all kinds of fast buffers to + * be supported. When the option is disabled only disk buffers are allowed to + * be used as the file size might be bigger than the buffer size that can be + * allocated. + * @param conf : configuration object for the given context + * @return true if the disk buffer and the multipart settings are supported + */ + public static boolean checkDiskBuffer(Configuration conf) { + boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + DEFAULT_MULTIPART_UPLOAD_ENABLED); + return isMultipartUploadEnabled + || FAST_UPLOAD_BUFFER_DISK.equals( + conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER)); + } + /** * Ensure that the long value is in the range of an integer. * @param name property name for error messages diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 14ffeed4a55..7f9db33157f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -269,8 +269,6 @@ public class WriteOperationHelper implements WriteOperations { String dest, File sourceFile, final PutObjectOptions options) { - Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE, - "File length is too big for a single PUT upload"); activateAuditSpan(); final ObjectMetadata objectMetadata = newObjectMetadata((int) sourceFile.length()); @@ -532,7 +530,7 @@ public class WriteOperationHelper implements WriteOperations { String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, Long offset) throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 321390446f7..32888314d88 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -233,7 +233,7 @@ public interface WriteOperations extends AuditSpanSource, Closeable { String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, Long offset) throws IOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index cae4d3ef034..2a4771925f0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -196,10 +196,11 @@ public interface RequestFactory { * @param destKey destination object key * @param options options for the request * @return the request. + * @throws PathIOException if multipart uploads are disabled */ InitiateMultipartUploadRequest newMultipartUploadRequest( String destKey, - @Nullable PutObjectOptions options); + @Nullable PutObjectOptions options) throws PathIOException; /** * Complete a multipart upload. @@ -248,7 +249,7 @@ public interface RequestFactory { String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, long offset) throws PathIOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index d6044edde29..e53c690431e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -217,6 +217,10 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", role, jobName(context), jobIdString(context), outputPath); S3AFileSystem fs = getDestS3AFS(); + if (!fs.isMultipartUploadEnabled()) { + throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem," + + " the committer can't proceed."); + } // set this thread's context with the job ID. // audit spans created in this thread will pick // up this value., including the commit operations instance diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index ce11df03839..7227941e344 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -124,6 +124,11 @@ public class RequestFactoryImpl implements RequestFactory { */ private final StorageClass storageClass; + /** + * Is multipart upload enabled. + */ + private final boolean isMultipartUploadEnabled; + /** * Constructor. * @param builder builder with all the configuration. @@ -137,6 +142,7 @@ public class RequestFactoryImpl implements RequestFactory { this.requestPreparer = builder.requestPreparer; this.contentEncoding = builder.contentEncoding; this.storageClass = builder.storageClass; + this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; } /** @@ -460,7 +466,10 @@ public class RequestFactoryImpl implements RequestFactory { @Override public InitiateMultipartUploadRequest newMultipartUploadRequest( final String destKey, - @Nullable final PutObjectOptions options) { + @Nullable final PutObjectOptions options) throws PathIOException { + if (!isMultipartUploadEnabled) { + throw new PathIOException(destKey, "Multipart uploads are disabled."); + } final ObjectMetadata objectMetadata = newObjectMetadata(-1); maybeSetMetadata(options, objectMetadata); final InitiateMultipartUploadRequest initiateMPURequest = @@ -509,7 +518,7 @@ public class RequestFactoryImpl implements RequestFactory { String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, long offset) throws PathIOException { @@ -682,6 +691,11 @@ public class RequestFactoryImpl implements RequestFactory { */ private PrepareRequest requestPreparer; + /** + * Is Multipart Enabled on the path. + */ + private boolean isMultipartUploadEnabled = true; + private RequestFactoryBuilder() { } @@ -767,6 +781,18 @@ public class RequestFactoryImpl implements RequestFactory { this.requestPreparer = value; return this; } + + /** + * Multipart upload enabled. + * + * @param value new value + * @return the builder + */ + public RequestFactoryBuilder withMultipartUploadEnabled( + final boolean value) { + this.isMultipartUploadEnabled = value; + return this; + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java index bd1466b2a43..554b628d003 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java @@ -32,21 +32,21 @@ public interface BlockOutputStreamStatistics extends Closeable, * Block is queued for upload. * @param blockSize block size. */ - void blockUploadQueued(int blockSize); + void blockUploadQueued(long blockSize); /** * Queued block has been scheduled for upload. * @param timeInQueue time in the queue. * @param blockSize block size. */ - void blockUploadStarted(Duration timeInQueue, int blockSize); + void blockUploadStarted(Duration timeInQueue, long blockSize); /** * A block upload has completed. Duration excludes time in the queue. * @param timeSinceUploadStarted time in since the transfer began. * @param blockSize block size */ - void blockUploadCompleted(Duration timeSinceUploadStarted, int blockSize); + void blockUploadCompleted(Duration timeSinceUploadStarted, long blockSize); /** * A block upload has failed. Duration excludes time in the queue. @@ -57,7 +57,7 @@ public interface BlockOutputStreamStatistics extends Closeable, * @param timeSinceUploadStarted time in since the transfer began. * @param blockSize block size */ - void blockUploadFailed(Duration timeSinceUploadStarted, int blockSize); + void blockUploadFailed(Duration timeSinceUploadStarted, long blockSize); /** * Intermediate report of bytes uploaded. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index d10b6484175..6454065b240 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -442,22 +442,22 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext { implements BlockOutputStreamStatistics { @Override - public void blockUploadQueued(final int blockSize) { + public void blockUploadQueued(final long blockSize) { } @Override public void blockUploadStarted(final Duration timeInQueue, - final int blockSize) { + final long blockSize) { } @Override public void blockUploadCompleted(final Duration timeSinceUploadStarted, - final int blockSize) { + final long blockSize) { } @Override public void blockUploadFailed(final Duration timeSinceUploadStarted, - final int blockSize) { + final long blockSize) { } @Override diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index ae042b16199..7e2a1c2b120 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1727,7 +1727,9 @@ The "fast" output stream 1. Uploads large files as blocks with the size set by `fs.s3a.multipart.size`. That is: the threshold at which multipart uploads - begin and the size of each upload are identical. + begin and the size of each upload are identical. This behavior can be enabled + or disabled by using the flag `fs.s3a.multipart.uploads.enabled` which by + default is set to true. 1. Buffers blocks to disk (default) or in on-heap or off-heap memory. 1. Uploads blocks in parallel in background threads. 1. Begins uploading blocks as soon as the buffered data exceeds this partition diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java index a859cd534bb..40857373fb8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java @@ -200,6 +200,11 @@ public class MockS3AFileSystem extends S3AFileSystem { return true; } + @Override + public boolean isMultipartUploadEnabled() { + return true; + } + /** * Make operation to set the s3 client public. * @param client client. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java new file mode 100644 index 00000000000..41593c2b263 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Verify that the magic committer cannot be created if the FS doesn't support multipart + * uploads. + */ +public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + removeBucketOverrides(getTestBucketName(conf), conf, + MAGIC_COMMITTER_ENABLED, + S3A_COMMITTER_FACTORY_KEY, + FS_S3A_COMMITTER_NAME, + MULTIPART_UPLOADS_ENABLED); + conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); + conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY); + conf.set(FS_S3A_COMMITTER_NAME, CommitConstants.COMMITTER_NAME_MAGIC); + return conf; + } + + @Test + public void testCreateCommitter() throws Exception { + TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(), + new TaskAttemptID()); + Path commitPath = methodPath(); + LOG.debug("Trying to create a committer on the path: {}", commitPath); + intercept(PathCommitException.class, + () -> new MagicS3GuardCommitter(commitPath, tContext)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java new file mode 100644 index 00000000000..a6d2c57d1d2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging.integration; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; +import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Verify that a staging committer cannot be created if the FS doesn't support multipart + * uploads. + */ +public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + removeBucketOverrides(getTestBucketName(conf), conf, + S3A_COMMITTER_FACTORY_KEY, + FS_S3A_COMMITTER_NAME, + MULTIPART_UPLOADS_ENABLED); + conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); + conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY); + conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING); + return conf; + } + + @Test + public void testCreateCommitter() throws Exception { + TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(), + new TaskAttemptID()); + Path commitPath = methodPath(); + LOG.debug("Trying to create a committer on the path: {}", commitPath); + intercept(PathCommitException.class, + () -> new StagingCommitter(commitPath, tContext)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 5c243bb820f..7c85142d437 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.impl; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; @@ -155,7 +156,7 @@ public class TestRequestFactory extends AbstractHadoopTestBase { * Create objects through the factory. * @param factory factory */ - private void createFactoryObjects(RequestFactory factory) { + private void createFactoryObjects(RequestFactory factory) throws IOException { String path = "path"; String path2 = "path2"; String id = "1"; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java new file mode 100644 index 00000000000..08192969e2d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Test; + +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.Constants; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; + +/** + * Test a file upload using a single PUT operation. Multipart uploads will + * be disabled in the test. + */ +public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase { + + public static final Logger LOG = LoggerFactory.getLogger( + ITestS3AHugeFileUploadSinglePut.class); + + private long fileSize; + + @Override + protected Configuration createScaleConfiguration() { + Configuration conf = super.createScaleConfiguration(); + removeBucketOverrides(getTestBucketName(conf), conf, + FAST_UPLOAD_BUFFER, + IO_CHUNK_BUFFER_SIZE, + KEY_HUGE_FILESIZE, + MULTIPART_UPLOADS_ENABLED, + MULTIPART_SIZE, + REQUEST_TIMEOUT); + conf.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false); + fileSize = getTestPropertyBytes(conf, KEY_HUGE_FILESIZE, + DEFAULT_HUGE_FILESIZE); + // set a small part size to verify it does not impact block allocation size + conf.setLong(MULTIPART_SIZE, 10_000); + conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK); + conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360); + conf.set(REQUEST_TIMEOUT, "1h"); + return conf; + } + + @Test + public void uploadFileSinglePut() throws IOException { + LOG.info("Creating file with size : {}", fileSize); + S3AFileSystem fs = getFileSystem(); + ContractTestUtils.createAndVerifyFile(fs, + methodPath(), fileSize); + // Exactly three put requests should be made during the upload of the file + // First one being the creation of the directory marker + // Second being the creation of the test file + // Third being the creation of directory marker on the file delete + assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol()) + .isEqualTo(3); + } +}