Revert "HADOOP-18637. S3A to support upload of files greater than 2 GB using DiskBlocks (#5630)"

This reverts commit df209dd2e3.

Caused test failures because of incorrect merge conflict resolution.
This commit is contained in:
Mukund Thakur 2023-05-10 14:19:21 -05:00
parent df209dd2e3
commit 86ad35c94c
20 changed files with 73 additions and 457 deletions

View File

@ -107,7 +107,6 @@
<configuration>
<forkCount>${testsThreadCount}</forkCount>
<reuseForks>false</reuseForks>
<trimStackTrace>false</trimStackTrace>
<argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
<systemPropertyVariables>
<testsThreadCount>${testsThreadCount}</testsThreadCount>
@ -277,7 +276,6 @@
<goal>verify</goal>
</goals>
<configuration>
<trimStackTrace>false</trimStackTrace>
<systemPropertyVariables>
<!-- Propagate scale parameters -->
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>

View File

@ -1256,23 +1256,4 @@ public final class Constants {
*/
public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4;
/**
* Option to enable or disable the multipart uploads.
* Value: {@value}.
* <p>
* Default is {@link #DEFAULT_MULTIPART_UPLOAD_ENABLED}.
*/
public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled";
/**
* Default value for multipart uploads.
* {@value}
*/
public static final boolean DEFAULT_MULTIPART_UPLOAD_ENABLED = true;
/**
* Stream supports multipart uploads to the given path.
*/
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED =
"fs.s3a.capability.multipart.uploads.enabled";
}

View File

@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements
private final String key;
/** Size of all blocks. */
private final long blockSize;
private final int blockSize;
/** IO Statistics. */
private final IOStatistics iostatistics;
@ -169,9 +169,6 @@ class S3ABlockOutputStream extends OutputStream implements
/** Thread level IOStatistics Aggregator. */
private final IOStatisticsAggregator threadIOStatisticsAggregator;
/** Is multipart upload enabled? */
private final boolean isMultipartUploadEnabled;
/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
@ -184,6 +181,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.builder = builder;
this.key = builder.key;
this.blockFactory = builder.blockFactory;
this.blockSize = (int) builder.blockSize;
this.statistics = builder.statistics;
// test instantiations may not provide statistics;
this.iostatistics = statistics.getIOStatistics();
@ -197,26 +195,17 @@ class S3ABlockOutputStream extends OutputStream implements
(ProgressListener) progress
: new ProgressableListener(progress);
downgradeSyncableExceptions = builder.downgradeSyncableExceptions;
// look for multipart support.
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
// block size is infinite if multipart is disabled, so ignore
// what was passed in from the builder.
this.blockSize = isMultipartUploadEnabled
? builder.blockSize
: -1;
// create that first block. This guarantees that an open + close sequence
// writes a 0-byte entry.
createBlockIfNeeded();
LOG.debug("Initialized S3ABlockOutputStream for {}" +
" output to {}", key, activeBlock);
if (putTracker.initialize()) {
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
}
this.isCSEEnabled = builder.isCSEEnabled;
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
// create that first block. This guarantees that an open + close sequence
// writes a 0-byte entry.
createBlockIfNeeded();
LOG.debug("Initialized S3ABlockOutputStream for {}" +
" output to {}", key, activeBlock);
}
/**
@ -329,15 +318,7 @@ class S3ABlockOutputStream extends OutputStream implements
statistics.writeBytes(len);
S3ADataBlocks.DataBlock block = createBlockIfNeeded();
int written = block.write(source, offset, len);
if (!isMultipartUploadEnabled) {
// no need to check for space as multipart uploads
// are not available...everything is saved to a single
// (disk) block.
return;
}
// look to see if another block is needed to complete
// the upload or exactly a block was written.
int remainingCapacity = (int) block.remainingCapacity();
int remainingCapacity = block.remainingCapacity();
if (written < len) {
// not everything was written the block has run out
// of capacity
@ -388,8 +369,6 @@ class S3ABlockOutputStream extends OutputStream implements
*/
@Retries.RetryTranslated
private void initMultipartUpload() throws IOException {
Preconditions.checkState(isMultipartUploadEnabled,
"multipart upload is disabled");
if (multiPartUpload == null) {
LOG.debug("Initiating Multipart upload");
multiPartUpload = new MultiPartUpload(key);
@ -579,20 +558,19 @@ class S3ABlockOutputStream extends OutputStream implements
}
/**
* Upload the current block as a single PUT request; if the buffer is empty a
* 0-byte PUT will be invoked, as it is needed to create an entry at the far
* end.
* @return number of bytes uploaded. If thread was interrupted while waiting
* for upload to complete, returns zero with interrupted flag set on this
* thread.
* @throws IOException
* any problem.
* Upload the current block as a single PUT request; if the buffer
* is empty a 0-byte PUT will be invoked, as it is needed to create an
* entry at the far end.
* @throws IOException any problem.
* @return number of bytes uploaded. If thread was interrupted while
* waiting for upload to complete, returns zero with interrupted flag set
* on this thread.
*/
private long putObject() throws IOException {
private int putObject() throws IOException {
LOG.debug("Executing regular upload for {}", writeOperationHelper);
final S3ADataBlocks.DataBlock block = getActiveBlock();
long size = block.dataSize();
int size = block.dataSize();
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
writeOperationHelper.createPutObjectRequest(
@ -639,7 +617,6 @@ class S3ABlockOutputStream extends OutputStream implements
"S3ABlockOutputStream{");
sb.append(writeOperationHelper.toString());
sb.append(", blockSize=").append(blockSize);
sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled);
// unsynced access; risks consistency in exchange for no risk of deadlock.
S3ADataBlocks.DataBlock block = activeBlock;
if (block != null) {
@ -858,7 +835,7 @@ class S3ABlockOutputStream extends OutputStream implements
Preconditions.checkNotNull(uploadId, "Null uploadId");
maybeRethrowUploadFailure();
partsSubmitted++;
final long size = block.dataSize();
final int size = block.dataSize();
bytesSubmitted += size;
final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request;
@ -1034,7 +1011,7 @@ class S3ABlockOutputStream extends OutputStream implements
ProgressEventType eventType = progressEvent.getEventType();
long bytesTransferred = progressEvent.getBytesTransferred();
long size = block.dataSize();
int size = block.dataSize();
switch (eventType) {
case REQUEST_BYTE_TRANSFER_EVENT:
@ -1149,11 +1126,6 @@ class S3ABlockOutputStream extends OutputStream implements
*/
private IOStatisticsAggregator ioStatisticsAggregator;
/**
* Is Multipart Uploads enabled for the given upload.
*/
private boolean isMultipartUploadEnabled;
private BlockOutputStreamBuilder() {
}
@ -1304,11 +1276,5 @@ class S3ABlockOutputStream extends OutputStream implements
ioStatisticsAggregator = value;
return this;
}
public BlockOutputStreamBuilder withMultipartEnabled(
final boolean value) {
isMultipartUploadEnabled = value;
return this;
}
}
}

View File

@ -180,7 +180,7 @@ final class S3ADataBlocks {
* @param statistics stats to work with
* @return a new block.
*/
abstract DataBlock create(long index, long limit,
abstract DataBlock create(long index, int limit,
BlockOutputStreamStatistics statistics)
throws IOException;
@ -258,7 +258,7 @@ final class S3ADataBlocks {
* Return the current data size.
* @return the size of the data
*/
abstract long dataSize();
abstract int dataSize();
/**
* Predicate to verify that the block has the capacity to write
@ -280,7 +280,7 @@ final class S3ADataBlocks {
* The remaining capacity in the block before it is full.
* @return the number of bytes remaining.
*/
abstract long remainingCapacity();
abstract int remainingCapacity();
/**
* Write a series of bytes from the buffer, from the offset.
@ -391,11 +391,9 @@ final class S3ADataBlocks {
}
@Override
DataBlock create(long index, long limit,
DataBlock create(long index, int limit,
BlockOutputStreamStatistics statistics)
throws IOException {
Preconditions.checkArgument(limit > 0,
"Invalid block size: %d", limit);
return new ByteArrayBlock(0, limit, statistics);
}
@ -438,11 +436,11 @@ final class S3ADataBlocks {
private Integer dataSize;
ByteArrayBlock(long index,
long limit,
int limit,
BlockOutputStreamStatistics statistics) {
super(index, statistics);
this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit;
buffer = new S3AByteArrayOutputStream(this.limit);
this.limit = limit;
buffer = new S3AByteArrayOutputStream(limit);
blockAllocated();
}
@ -451,7 +449,7 @@ final class S3ADataBlocks {
* @return the amount of data available to upload.
*/
@Override
long dataSize() {
int dataSize() {
return dataSize != null ? dataSize : buffer.size();
}
@ -470,14 +468,14 @@ final class S3ADataBlocks {
}
@Override
long remainingCapacity() {
int remainingCapacity() {
return limit - dataSize();
}
@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = (int) Math.min(remainingCapacity(), len);
int written = Math.min(remainingCapacity(), len);
buffer.write(b, offset, written);
return written;
}
@ -516,11 +514,9 @@ final class S3ADataBlocks {
}
@Override
ByteBufferBlock create(long index, long limit,
ByteBufferBlock create(long index, int limit,
BlockOutputStreamStatistics statistics)
throws IOException {
Preconditions.checkArgument(limit > 0,
"Invalid block size: %d", limit);
return new ByteBufferBlock(index, limit, statistics);
}
@ -568,12 +564,11 @@ final class S3ADataBlocks {
* @param statistics statistics to update
*/
ByteBufferBlock(long index,
long bufferSize,
int bufferSize,
BlockOutputStreamStatistics statistics) {
super(index, statistics);
this.bufferSize = bufferSize > Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) bufferSize;
blockBuffer = requestBuffer(this.bufferSize);
this.bufferSize = bufferSize;
blockBuffer = requestBuffer(bufferSize);
blockAllocated();
}
@ -582,7 +577,7 @@ final class S3ADataBlocks {
* @return the amount of data available to upload.
*/
@Override
long dataSize() {
int dataSize() {
return dataSize != null ? dataSize : bufferCapacityUsed();
}
@ -603,7 +598,7 @@ final class S3ADataBlocks {
}
@Override
public long remainingCapacity() {
public int remainingCapacity() {
return blockBuffer != null ? blockBuffer.remaining() : 0;
}
@ -614,7 +609,7 @@ final class S3ADataBlocks {
@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = (int) Math.min(remainingCapacity(), len);
int written = Math.min(remainingCapacity(), len);
blockBuffer.put(b, offset, written);
return written;
}
@ -807,18 +802,16 @@ final class S3ADataBlocks {
* Create a temp file and a {@link DiskBlock} instance to manage it.
*
* @param index block index
* @param limit limit of the block. -1 means "no limit"
* @param limit limit of the block.
* @param statistics statistics to update
* @return the new block
* @throws IOException IO problems
*/
@Override
DataBlock create(long index,
long limit,
int limit,
BlockOutputStreamStatistics statistics)
throws IOException {
Preconditions.checkArgument(limit != 0,
"Invalid block size: %d", limit);
File destFile = getOwner()
.createTmpFileForWrite(String.format("s3ablock-%04d-", index),
limit, getOwner().getConf());
@ -832,14 +825,14 @@ final class S3ADataBlocks {
*/
static class DiskBlock extends DataBlock {
private long bytesWritten;
private int bytesWritten;
private final File bufferFile;
private final long limit;
private final int limit;
private BufferedOutputStream out;
private final AtomicBoolean closed = new AtomicBoolean(false);
DiskBlock(File bufferFile,
long limit,
int limit,
long index,
BlockOutputStreamStatistics statistics)
throws FileNotFoundException {
@ -851,39 +844,24 @@ final class S3ADataBlocks {
}
@Override
long dataSize() {
int dataSize() {
return bytesWritten;
}
/**
* Does this block have unlimited space?
* @return true if a block with no size limit was created.
*/
private boolean unlimited() {
return limit < 0;
}
@Override
boolean hasCapacity(long bytes) {
return unlimited() || dataSize() + bytes <= limit;
return dataSize() + bytes <= limit;
}
/**
* {@inheritDoc}.
* If there is no limit to capacity, return MAX_VALUE.
* @return capacity in the block.
*/
@Override
long remainingCapacity() {
return unlimited()
? Integer.MAX_VALUE
: limit - bytesWritten;
int remainingCapacity() {
return limit - bytesWritten;
}
@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = (int) Math.min(remainingCapacity(), len);
int written = Math.min(remainingCapacity(), len);
out.write(b, offset, written);
bytesWritten += written;
return written;

View File

@ -413,11 +413,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private ArnResource accessPoint;
/**
* Does this S3A FS instance have multipart upload enabled?
*/
private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED;
/**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
@ -539,8 +534,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
long prefetchBlockSizeLong =
longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE,
@ -613,6 +606,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
DEFAULT_FAST_UPLOAD_BUFFER);
partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
blockOutputActiveBlocks = intOption(conf,
FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
@ -621,8 +615,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
blockOutputActiveBlocks = 1;
}
LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
" queue limit={}; multipart={}",
blockOutputBuffer, partSize, blockOutputActiveBlocks, isMultipartUploadEnabled);
" queue limit={}",
blockOutputBuffer, partSize, blockOutputActiveBlocks);
// verify there's no S3Guard in the store config.
checkNoS3Guard(this.getUri(), getConf());
@ -789,8 +783,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
int activeTasksForBoundedThreadPool = maxThreads;
int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads;
boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
maxThreads,
maxThreads + totalTasks,
activeTasksForBoundedThreadPool,
waitingTasksForBoundedThreadPool,
keepAliveTime, TimeUnit.SECONDS,
name + "-bounded");
unboundedThreadPool = new ThreadPoolExecutor(
@ -5437,8 +5431,4 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
public boolean isCSEEnabled() {
return isCSEEnabled;
}
public boolean isMultipartUploadEnabled() {
return isMultipartUploadEnabled;
}
}

View File

@ -1547,7 +1547,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
* of block uploads pending (1) and the bytes pending (blockSize).
*/
@Override
public void blockUploadQueued(long blockSize) {
public void blockUploadQueued(int blockSize) {
incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS);
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1);
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize);
@ -1560,7 +1560,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
* {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}.
*/
@Override
public void blockUploadStarted(Duration timeInQueue, long blockSize) {
public void blockUploadStarted(Duration timeInQueue, int blockSize) {
// the local counter is used in toString reporting.
queueDuration.addAndGet(timeInQueue.toMillis());
// update the duration fields in the IOStatistics.
@ -1588,7 +1588,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
@Override
public void blockUploadCompleted(
Duration timeSinceUploadStarted,
long blockSize) {
int blockSize) {
transferDuration.addAndGet(timeSinceUploadStarted.toMillis());
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1);
blockUploadsCompleted.incrementAndGet();
@ -1602,7 +1602,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
@Override
public void blockUploadFailed(
Duration timeSinceUploadStarted,
long blockSize) {
int blockSize) {
incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS);
}

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
@ -1027,38 +1026,6 @@ public final class S3AUtils {
return partSize;
}
/**
* Validates the output stream configuration.
* @param path path: for error messages
* @param conf : configuration object for the given context
* @throws PathIOException Unsupported configuration.
*/
public static void validateOutputStreamConfiguration(final Path path,
Configuration conf) throws PathIOException {
if(!checkDiskBuffer(conf)){
throw new PathIOException(path.toString(),
"Unable to create OutputStream with the given"
+ " multipart upload and buffer configuration.");
}
}
/**
* Check whether the configuration for S3ABlockOutputStream is
* consistent or not. Multipart uploads allow all kinds of fast buffers to
* be supported. When the option is disabled only disk buffers are allowed to
* be used as the file size might be bigger than the buffer size that can be
* allocated.
* @param conf : configuration object for the given context
* @return true if the disk buffer and the multipart settings are supported
*/
public static boolean checkDiskBuffer(Configuration conf) {
boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
return isMultipartUploadEnabled
|| FAST_UPLOAD_BUFFER_DISK.equals(
conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER));
}
/**
* Ensure that the long value is in the range of an integer.
* @param name property name for error messages

View File

@ -269,6 +269,8 @@ public class WriteOperationHelper implements WriteOperations {
String dest,
File sourceFile,
final PutObjectOptions options) {
Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE,
"File length is too big for a single PUT upload");
activateAuditSpan();
final ObjectMetadata objectMetadata =
newObjectMetadata((int) sourceFile.length());
@ -530,7 +532,7 @@ public class WriteOperationHelper implements WriteOperations {
String destKey,
String uploadId,
int partNumber,
long size,
int size,
InputStream uploadStream,
File sourceFile,
Long offset) throws IOException {

View File

@ -233,7 +233,7 @@ public interface WriteOperations extends AuditSpanSource, Closeable {
String destKey,
String uploadId,
int partNumber,
long size,
int size,
InputStream uploadStream,
File sourceFile,
Long offset) throws IOException;

View File

@ -196,11 +196,10 @@ public interface RequestFactory {
* @param destKey destination object key
* @param options options for the request
* @return the request.
* @throws PathIOException if multipart uploads are disabled
*/
InitiateMultipartUploadRequest newMultipartUploadRequest(
String destKey,
@Nullable PutObjectOptions options) throws PathIOException;
@Nullable PutObjectOptions options);
/**
* Complete a multipart upload.
@ -249,7 +248,7 @@ public interface RequestFactory {
String destKey,
String uploadId,
int partNumber,
long size,
int size,
InputStream uploadStream,
File sourceFile,
long offset) throws PathIOException;

View File

@ -217,10 +217,6 @@ public abstract class AbstractS3ACommitter extends PathOutputCommitter
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
role, jobName(context), jobIdString(context), outputPath);
S3AFileSystem fs = getDestS3AFS();
if (!fs.isMultipartUploadEnabled()) {
throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem,"
+ " the committer can't proceed.");
}
// set this thread's context with the job ID.
// audit spans created in this thread will pick
// up this value., including the commit operations instance

View File

@ -124,11 +124,6 @@ public class RequestFactoryImpl implements RequestFactory {
*/
private final StorageClass storageClass;
/**
* Is multipart upload enabled.
*/
private final boolean isMultipartUploadEnabled;
/**
* Constructor.
* @param builder builder with all the configuration.
@ -142,7 +137,6 @@ public class RequestFactoryImpl implements RequestFactory {
this.requestPreparer = builder.requestPreparer;
this.contentEncoding = builder.contentEncoding;
this.storageClass = builder.storageClass;
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
}
/**
@ -466,10 +460,7 @@ public class RequestFactoryImpl implements RequestFactory {
@Override
public InitiateMultipartUploadRequest newMultipartUploadRequest(
final String destKey,
@Nullable final PutObjectOptions options) throws PathIOException {
if (!isMultipartUploadEnabled) {
throw new PathIOException(destKey, "Multipart uploads are disabled.");
}
@Nullable final PutObjectOptions options) {
final ObjectMetadata objectMetadata = newObjectMetadata(-1);
maybeSetMetadata(options, objectMetadata);
final InitiateMultipartUploadRequest initiateMPURequest =
@ -518,7 +509,7 @@ public class RequestFactoryImpl implements RequestFactory {
String destKey,
String uploadId,
int partNumber,
long size,
int size,
InputStream uploadStream,
File sourceFile,
long offset) throws PathIOException {
@ -691,11 +682,6 @@ public class RequestFactoryImpl implements RequestFactory {
*/
private PrepareRequest requestPreparer;
/**
* Is Multipart Enabled on the path.
*/
private boolean isMultipartUploadEnabled = true;
private RequestFactoryBuilder() {
}
@ -781,18 +767,6 @@ public class RequestFactoryImpl implements RequestFactory {
this.requestPreparer = value;
return this;
}
/**
* Multipart upload enabled.
*
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withMultipartUploadEnabled(
final boolean value) {
this.isMultipartUploadEnabled = value;
return this;
}
}
/**

View File

@ -32,21 +32,21 @@ public interface BlockOutputStreamStatistics extends Closeable,
* Block is queued for upload.
* @param blockSize block size.
*/
void blockUploadQueued(long blockSize);
void blockUploadQueued(int blockSize);
/**
* Queued block has been scheduled for upload.
* @param timeInQueue time in the queue.
* @param blockSize block size.
*/
void blockUploadStarted(Duration timeInQueue, long blockSize);
void blockUploadStarted(Duration timeInQueue, int blockSize);
/**
* A block upload has completed. Duration excludes time in the queue.
* @param timeSinceUploadStarted time in since the transfer began.
* @param blockSize block size
*/
void blockUploadCompleted(Duration timeSinceUploadStarted, long blockSize);
void blockUploadCompleted(Duration timeSinceUploadStarted, int blockSize);
/**
* A block upload has failed. Duration excludes time in the queue.
@ -57,7 +57,7 @@ public interface BlockOutputStreamStatistics extends Closeable,
* @param timeSinceUploadStarted time in since the transfer began.
* @param blockSize block size
*/
void blockUploadFailed(Duration timeSinceUploadStarted, long blockSize);
void blockUploadFailed(Duration timeSinceUploadStarted, int blockSize);
/**
* Intermediate report of bytes uploaded.

View File

@ -442,22 +442,22 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
implements BlockOutputStreamStatistics {
@Override
public void blockUploadQueued(final long blockSize) {
public void blockUploadQueued(final int blockSize) {
}
@Override
public void blockUploadStarted(final Duration timeInQueue,
final long blockSize) {
final int blockSize) {
}
@Override
public void blockUploadCompleted(final Duration timeSinceUploadStarted,
final long blockSize) {
final int blockSize) {
}
@Override
public void blockUploadFailed(final Duration timeSinceUploadStarted,
final long blockSize) {
final int blockSize) {
}
@Override

View File

@ -1723,9 +1723,7 @@ The "fast" output stream
1. Uploads large files as blocks with the size set by
`fs.s3a.multipart.size`. That is: the threshold at which multipart uploads
begin and the size of each upload are identical. This behavior can be enabled
or disabled by using the flag `fs.s3a.multipart.uploads.enabled` which by
default is set to true.
begin and the size of each upload are identical.
1. Buffers blocks to disk (default) or in on-heap or off-heap memory.
1. Uploads blocks in parallel in background threads.
1. Begins uploading blocks as soon as the buffered data exceeds this partition

View File

@ -200,11 +200,6 @@ public class MockS3AFileSystem extends S3AFileSystem {
return true;
}
@Override
public boolean isMultipartUploadEnabled() {
return true;
}
/**
* Make operation to set the s3 client public.
* @param client client.

View File

@ -1,69 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.magic;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Verify that the magic committer cannot be created if the FS doesn't support multipart
* uploads.
*/
public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBucketOverrides(getTestBucketName(conf), conf,
MAGIC_COMMITTER_ENABLED,
S3A_COMMITTER_FACTORY_KEY,
FS_S3A_COMMITTER_NAME,
MULTIPART_UPLOADS_ENABLED);
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
conf.set(FS_S3A_COMMITTER_NAME, CommitConstants.COMMITTER_NAME_MAGIC);
return conf;
}
@Test
public void testCreateCommitter() throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
new TaskAttemptID());
Path commitPath = methodPath();
LOG.debug("Trying to create a committer on the path: {}", commitPath);
intercept(PathCommitException.class,
() -> new MagicS3GuardCommitter(commitPath, tContext));
}
}

View File

@ -1,69 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit.staging.integration;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Verify that a staging committer cannot be created if the FS doesn't support multipart
* uploads.
*/
public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBucketOverrides(getTestBucketName(conf), conf,
S3A_COMMITTER_FACTORY_KEY,
FS_S3A_COMMITTER_NAME,
MULTIPART_UPLOADS_ENABLED);
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY);
conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING);
return conf;
}
@Test
public void testCreateCommitter() throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(),
new TaskAttemptID());
Path commitPath = methodPath();
LOG.debug("Trying to create a committer on the path: {}", commitPath);
intercept(PathCommitException.class,
() -> new StagingCommitter(commitPath, tContext));
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a.impl;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
@ -156,7 +155,7 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
* Create objects through the factory.
* @param factory factory
*/
private void createFactoryObjects(RequestFactory factory) throws IOException {
private void createFactoryObjects(RequestFactory factory) {
String path = "path";
String path2 = "path2";
String id = "1";

View File

@ -1,89 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants;
import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
/**
* Test a file upload using a single PUT operation. Multipart uploads will
* be disabled in the test.
*/
public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase {
public static final Logger LOG = LoggerFactory.getLogger(
ITestS3AHugeFileUploadSinglePut.class);
private long fileSize;
@Override
protected Configuration createScaleConfiguration() {
Configuration conf = super.createScaleConfiguration();
removeBucketOverrides(getTestBucketName(conf), conf,
FAST_UPLOAD_BUFFER,
IO_CHUNK_BUFFER_SIZE,
KEY_HUGE_FILESIZE,
MULTIPART_UPLOADS_ENABLED,
MULTIPART_SIZE,
REQUEST_TIMEOUT);
conf.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false);
fileSize = getTestPropertyBytes(conf, KEY_HUGE_FILESIZE,
DEFAULT_HUGE_FILESIZE);
// set a small part size to verify it does not impact block allocation size
conf.setLong(MULTIPART_SIZE, 10_000);
conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK);
conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
conf.set(REQUEST_TIMEOUT, "1h");
return conf;
}
@Test
public void uploadFileSinglePut() throws IOException {
LOG.info("Creating file with size : {}", fileSize);
S3AFileSystem fs = getFileSystem();
ContractTestUtils.createAndVerifyFile(fs,
methodPath(), fileSize);
// Exactly three put requests should be made during the upload of the file
// First one being the creation of the directory marker
// Second being the creation of the test file
// Third being the creation of directory marker on the file delete
assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol())
.isEqualTo(3);
}
}