From eb749ddd4ddd700d9d6afaefb6d44bc3faecb724 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 27 Apr 2023 10:59:46 +0100 Subject: [PATCH] HADOOP-18695. S3A: reject multipart copy requests when disabled (#5548) Contributed by Steve Loughran. --- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 7 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 35 ++- .../hadoop/fs/s3a/S3AInstrumentation.java | 8 +- .../apache/hadoop/fs/s3a/S3ARetryPolicy.java | 6 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 8 +- .../hadoop/fs/s3a/WriteOperationHelper.java | 32 +-- .../apache/hadoop/fs/s3a/WriteOperations.java | 22 +- .../s3a/api/UnsupportedRequestException.java | 41 ++++ .../fs/s3a/audit/AWSRequestAnalyzer.java | 14 ++ .../hadoop/fs/s3a/audit/AuditIntegration.java | 18 ++ .../AuditOperationRejectedException.java | 33 +++ .../fs/s3a/audit/impl/LoggingAuditor.java | 19 ++ .../fs/s3a/commit/impl/CommitOperations.java | 2 +- .../s3a/commit/magic/MagicCommitTracker.java | 2 +- .../fs/s3a/impl/S3AMultipartUploader.java | 13 +- .../S3AMultipartUploaderStatisticsImpl.java | 9 +- .../hadoop/fs/s3a/ITestS3AMiscOperations.java | 2 +- .../hadoop/fs/s3a/MultipartTestUtils.java | 2 +- .../fs/s3a/audit/TestAuditIntegration.java | 20 ++ .../magic/ITestS3AHugeMagicCommits.java | 20 +- .../s3a/scale/AbstractSTestS3AHugeFiles.java | 209 ++++++++++++++---- .../scale/ITestS3ADirectoryPerformance.java | 2 +- .../ITestS3AHugeFileUploadSinglePut.java | 89 -------- .../scale/ITestS3AHugeFilesNoMultipart.java | 111 ++++++++++ 24 files changed, 529 insertions(+), 195 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/UnsupportedRequestException.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditOperationRejectedException.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index df3c9315ba8..43a2b7e0dbd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -614,7 +614,10 @@ class S3ABlockOutputStream extends OutputStream implements try { // the putObject call automatically closes the input // stream afterwards. - return writeOperationHelper.putObject(putObjectRequest, builder.putOptions); + return writeOperationHelper.putObject( + putObjectRequest, + builder.putOptions, + statistics); } finally { cleanupWithLogger(LOG, uploadData, block); } @@ -897,7 +900,7 @@ class S3ABlockOutputStream extends OutputStream implements try { LOG.debug("Uploading part {} for id '{}'", currentPartNumber, uploadId); - PartETag partETag = writeOperationHelper.uploadPart(request) + PartETag partETag = writeOperationHelper.uploadPart(request, statistics) .getPartETag(); LOG.debug("Completed upload of {} to part {}", block, partETag.getETag()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 30b2813caf7..999186f8cd5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -2603,6 +2603,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, : null; } + /** + * Given a possibly null duration tracker factory, return a non-null + * one for use in tracking durations -either that or the FS tracker + * itself. + * + * @param factory factory. + * @return a non-null factory. + */ + protected DurationTrackerFactory nonNullDurationTrackerFactory( + DurationTrackerFactory factory) { + return factory != null + ? factory + : getDurationTrackerFactory(); + } + /** * Request object metadata; increments counters in the process. * Retry policy: retry untranslated. @@ -2961,20 +2976,22 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, * Important: this call will close any input stream in the request. * @param putObjectRequest the request * @param putOptions put object options + * @param durationTrackerFactory factory for duration tracking * @return the upload initiated * @throws AmazonClientException on problems */ @VisibleForTesting @Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed") PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest, - PutObjectOptions putOptions) + PutObjectOptions putOptions, + DurationTrackerFactory durationTrackerFactory) throws AmazonClientException { long len = getPutRequestLength(putObjectRequest); LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey()); incrementPutStartStatistics(len); try { PutObjectResult result = trackDurationOfSupplier( - getDurationTrackerFactory(), + nonNullDurationTrackerFactory(durationTrackerFactory), OBJECT_PUT_REQUESTS.getSymbol(), () -> s3.putObject(putObjectRequest)); incrementPutCompletedStatistics(true, len); @@ -3013,16 +3030,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, * * Retry Policy: none. * @param request request + * @param durationTrackerFactory duration tracker factory for operation * @return the result of the operation. * @throws AmazonClientException on problems */ @Retries.OnceRaw - UploadPartResult uploadPart(UploadPartRequest request) + UploadPartResult uploadPart(UploadPartRequest request, + final DurationTrackerFactory durationTrackerFactory) throws AmazonClientException { long len = request.getPartSize(); incrementPutStartStatistics(len); try { - UploadPartResult uploadPartResult = s3.uploadPart(request); + UploadPartResult uploadPartResult = trackDurationOfSupplier( + nonNullDurationTrackerFactory(durationTrackerFactory), + MULTIPART_UPLOAD_PART_PUT.getSymbol(), () -> + s3.uploadPart(request)); incrementPutCompletedStatistics(true, len); return uploadPartResult; } catch (AmazonClientException e) { @@ -4435,8 +4457,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, throws IOException { invoker.retry("PUT 0-byte object ", objectName, true, () -> - putObjectDirect(getRequestFactory() - .newDirectoryMarkerRequest(objectName), putOptions)); + putObjectDirect(getRequestFactory().newDirectoryMarkerRequest(objectName), + putOptions, + getDurationTrackerFactory())); incrementPutProgressStatistics(objectName, 0); instrumentation.directoryCreated(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index da12223570e..bd867859e02 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -1441,9 +1441,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource, final IOStatisticsStore sourceIOStatistics = source.getIOStatistics(); this.getIOStatistics().aggregate(sourceIOStatistics); - // propagate any extra values into the FS-level stats. - incrementMutableCounter(OBJECT_PUT_REQUESTS.getSymbol(), - sourceIOStatistics.counters().get(OBJECT_PUT_REQUESTS.getSymbol())); + // propagate any extra values into the FS-level stats; incrementMutableCounter( COMMITTER_MAGIC_MARKER_PUT.getSymbol(), sourceIOStatistics.counters().get(COMMITTER_MAGIC_MARKER_PUT.getSymbol())); @@ -1507,6 +1505,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource, COMMITTER_MAGIC_MARKER_PUT.getSymbol(), INVOCATION_ABORT.getSymbol(), MULTIPART_UPLOAD_COMPLETED.getSymbol(), + MULTIPART_UPLOAD_PART_PUT.getSymbol(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(), OBJECT_PUT_REQUESTS.getSymbol()) @@ -1773,7 +1772,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource, COMMITTER_COMMIT_JOB.getSymbol(), COMMITTER_LOAD_SINGLE_PENDING_FILE.getSymbol(), COMMITTER_MATERIALIZE_FILE.getSymbol(), - COMMITTER_STAGE_FILE_UPLOAD.getSymbol()) + COMMITTER_STAGE_FILE_UPLOAD.getSymbol(), + OBJECT_PUT_REQUESTS.getSymbol()) .build(); setIOStatistics(st); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java index 30427f7672a..528a99f5e09 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java @@ -31,17 +31,18 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import com.amazonaws.AmazonClientException; -import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException; import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.net.ConnectTimeoutException; +import org.apache.hadoop.util.Preconditions; import static org.apache.hadoop.io.retry.RetryPolicies.*; @@ -228,6 +229,9 @@ public class S3ARetryPolicy implements RetryPolicy { policyMap.put(AWSS3IOException.class, retryIdempotentCalls); policyMap.put(SocketTimeoutException.class, retryIdempotentCalls); + // Unsupported requests do not work, however many times you try + policyMap.put(UnsupportedRequestException.class, fail); + return policyMap; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 274bc96fb99..e22433322c9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -44,6 +44,8 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.hadoop.fs.s3a.audit.AuditFailureException; +import org.apache.hadoop.fs.s3a.audit.AuditIntegration; import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider; import org.apache.hadoop.fs.s3a.impl.NetworkBinding; @@ -203,10 +205,14 @@ public final class S3AUtils { // call considered an sign of connectivity failure return (EOFException)new EOFException(message).initCause(exception); } + // if the exception came from the auditor, hand off translation + // to it. + if (exception instanceof AuditFailureException) { + return AuditIntegration.translateAuditException(path, (AuditFailureException) exception); + } if (exception instanceof CredentialInitializationException) { // the exception raised by AWSCredentialProvider list if the // credentials were not accepted, - // or auditing blocked the operation. return (AccessDeniedException)new AccessDeniedException(path, null, exception.toString()).initCause(exception); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 7f9db33157f..8e15a10944c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -52,6 +52,7 @@ import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.s3a.select.SelectBinding; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.util.DurationInfo; @@ -564,36 +565,19 @@ public class WriteOperationHelper implements WriteOperations { * file, from the content length of the header. * @param putObjectRequest the request * @param putOptions put object options + * @param durationTrackerFactory factory for duration tracking * @return the upload initiated * @throws IOException on problems */ @Retries.RetryTranslated public PutObjectResult putObject(PutObjectRequest putObjectRequest, - PutObjectOptions putOptions) + PutObjectOptions putOptions, + DurationTrackerFactory durationTrackerFactory) throws IOException { return retry("Writing Object", putObjectRequest.getKey(), true, withinAuditSpan(getAuditSpan(), () -> - owner.putObjectDirect(putObjectRequest, putOptions))); - } - - /** - * PUT an object. - * - * @param putObjectRequest the request - * @param putOptions put object options - * - * @throws IOException on problems - */ - @Retries.RetryTranslated - public void uploadObject(PutObjectRequest putObjectRequest, - PutObjectOptions putOptions) - throws IOException { - - retry("Writing Object", - putObjectRequest.getKey(), true, - withinAuditSpan(getAuditSpan(), () -> - owner.putObjectDirect(putObjectRequest, putOptions))); + owner.putObjectDirect(putObjectRequest, putOptions, durationTrackerFactory))); } /** @@ -650,18 +634,20 @@ public class WriteOperationHelper implements WriteOperations { /** * Upload part of a multi-partition file. * @param request request + * @param durationTrackerFactory duration tracker factory for operation * @return the result of the operation. * @throws IOException on problems */ @Retries.RetryTranslated - public UploadPartResult uploadPart(UploadPartRequest request) + public UploadPartResult uploadPart(UploadPartRequest request, + final DurationTrackerFactory durationTrackerFactory) throws IOException { return retry("upload part #" + request.getPartNumber() + " upload ID " + request.getUploadId(), request.getKey(), true, withinAuditSpan(getAuditSpan(), - () -> owner.uploadPart(request))); + () -> owner.uploadPart(request, durationTrackerFactory))); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 32888314d88..bd6e5ac1473 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.store.audit.AuditSpanSource; import org.apache.hadoop.util.functional.CallableRaisingIOE; @@ -244,25 +245,14 @@ public interface WriteOperations extends AuditSpanSource, Closeable { * file, from the content length of the header. * @param putObjectRequest the request * @param putOptions put object options + * @param durationTrackerFactory factory for duration tracking * @return the upload initiated * @throws IOException on problems */ @Retries.RetryTranslated PutObjectResult putObject(PutObjectRequest putObjectRequest, - PutObjectOptions putOptions) - throws IOException; - - /** - * PUT an object via the transfer manager. - * - * @param putObjectRequest the request - * @param putOptions put object options - * - * @throws IOException on problems - */ - @Retries.RetryTranslated - void uploadObject(PutObjectRequest putObjectRequest, - PutObjectOptions putOptions) + PutObjectOptions putOptions, + DurationTrackerFactory durationTrackerFactory) throws IOException; /** @@ -299,11 +289,13 @@ public interface WriteOperations extends AuditSpanSource, Closeable { /** * Upload part of a multi-partition file. * @param request request + * @param durationTrackerFactory factory for duration tracking * @return the result of the operation. * @throws IOException on problems */ @Retries.RetryTranslated - UploadPartResult uploadPart(UploadPartRequest request) + UploadPartResult uploadPart(UploadPartRequest request, + DurationTrackerFactory durationTrackerFactory) throws IOException; /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/UnsupportedRequestException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/UnsupportedRequestException.java new file mode 100644 index 00000000000..36b7148f00d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/UnsupportedRequestException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.api; + +import org.apache.hadoop.fs.PathIOException; + +/** + * An operation is unsupported. + */ +public class UnsupportedRequestException extends PathIOException { + + public UnsupportedRequestException(final String path, final Throwable cause) { + super(path, cause); + } + + public UnsupportedRequestException(final String path, final String error) { + super(path, error); + } + + public UnsupportedRequestException(final String path, + final String error, + final Throwable cause) { + super(path, error, cause); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java index f2963d73192..b4be341c912 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java @@ -217,6 +217,20 @@ public class AWSRequestAnalyzer { || request instanceof GetBucketLocationRequest; } + /** + * Predicate which returns true if the request is part of the + * multipart upload API -and which therefore must be rejected + * if multipart upload is disabled. + * @param request request + * @return true if the transfer manager creates them. + */ + public static boolean isRequestMultipartIO(final Object request) { + return request instanceof CopyPartRequest + || request instanceof CompleteMultipartUploadRequest + || request instanceof InitiateMultipartUploadRequest + || request instanceof UploadPartRequest; + } + /** * Info about a request. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java index bc964b430eb..c66f45eb309 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditIntegration.java @@ -21,12 +21,14 @@ package org.apache.hadoop.fs.s3a.audit; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.nio.file.AccessDeniedException; import com.amazonaws.HandlerContextAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException; import org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A; import org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor; import org.apache.hadoop.fs.s3a.audit.impl.NoopAuditManagerS3A; @@ -142,4 +144,20 @@ public final class AuditIntegration { request.addHandlerContext(AUDIT_SPAN_HANDLER_CONTEXT, span); } + /** + * Translate an audit exception. + * @param path path of operation. + * @param exception exception + * @return the IOE to raise. + */ + public static IOException translateAuditException(String path, + AuditFailureException exception) { + if (exception instanceof AuditOperationRejectedException) { + // special handling of this subclass + return new UnsupportedRequestException(path, + exception.getMessage(), exception); + } + return (AccessDeniedException)new AccessDeniedException(path, null, + exception.toString()).initCause(exception); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditOperationRejectedException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditOperationRejectedException.java new file mode 100644 index 00000000000..f93313b261f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditOperationRejectedException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +/** + * The auditor has rejected the operation as forbidden/unavailable. + */ +public class AuditOperationRejectedException extends AuditFailureException { + + public AuditOperationRejectedException(final String message, final Throwable t) { + super(message, t); + } + + public AuditOperationRejectedException(final String message) { + super(message); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java index feb926a0bfc..6d6bd935432 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.audit.AuditConstants; import org.apache.hadoop.fs.audit.CommonAuditContext; import org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer; import org.apache.hadoop.fs.s3a.audit.AuditFailureException; +import org.apache.hadoop.fs.s3a.audit.AuditOperationRejectedException; import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; import org.apache.hadoop.fs.store.LogExactlyOnce; import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader; @@ -46,6 +47,9 @@ import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_THREAD0; import static org.apache.hadoop.fs.audit.AuditConstants.PARAM_TIMESTAMP; import static org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext; import static org.apache.hadoop.fs.audit.CommonAuditContext.currentThreadID; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MULTIPART_UPLOAD_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO; import static org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED; @@ -112,6 +116,12 @@ public class LoggingAuditor */ private Collection filters; + /** + * Does the S3A FS instance being audited have multipart upload enabled? + * If not: fail if a multipart upload is initiated. + */ + private boolean isMultipartUploadEnabled; + /** * Log for warning of problems getting the range of GetObjectRequest * will only log of a problem once per process instance. @@ -164,6 +174,8 @@ public class LoggingAuditor final CommonAuditContext currentContext = currentAuditContext(); warningSpan = new WarningSpan(OUTSIDE_SPAN, currentContext, createSpanID(), null, null); + isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + DEFAULT_MULTIPART_UPLOAD_ENABLED); } @Override @@ -173,6 +185,7 @@ public class LoggingAuditor sb.append("ID='").append(getAuditorId()).append('\''); sb.append(", headerEnabled=").append(headerEnabled); sb.append(", rejectOutOfSpan=").append(rejectOutOfSpan); + sb.append(", isMultipartUploadEnabled=").append(isMultipartUploadEnabled); sb.append('}'); return sb.toString(); } @@ -363,6 +376,12 @@ public class LoggingAuditor analyzer.analyze(request), header); } + // now see if the request is actually a blocked multipart request + if (!isMultipartUploadEnabled && isRequestMultipartIO(request)) { + throw new AuditOperationRejectedException("Multipart IO request " + + request + " rejected " + header); + } + return request; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java index eb23f299a02..ef56d829781 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java @@ -583,7 +583,7 @@ public class CommitOperations extends AbstractStoreOperation localFile, offset); part.setLastPart(partNumber == numParts); - UploadPartResult partResult = writeOperations.uploadPart(part); + UploadPartResult partResult = writeOperations.uploadPart(part, statistics); offset += uploadPartSize; parts.add(partResult.getPartETag()); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java index c85571a1949..1a5451df801 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java @@ -185,7 +185,7 @@ public class MagicCommitTracker extends PutTracker { private void upload(PutObjectRequest request) throws IOException { trackDurationOfInvocation(trackerStatistics, COMMITTER_MAGIC_MARKER_PUT.getSymbol(), () -> - writer.uploadObject(request, PutObjectOptions.keepingDirs())); + writer.putObject(request, PutObjectOptions.keepingDirs(), null)); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java index 3a6a04e5e71..4ab5bc6a992 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java @@ -57,7 +57,10 @@ import org.apache.hadoop.fs.s3a.statistics.S3AMultipartUploaderStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.util.Preconditions; +import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_COMPLETED; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfCallable; /** * MultipartUploader for S3AFileSystem. This uses the S3 multipart @@ -122,13 +125,13 @@ class S3AMultipartUploader extends AbstractMultipartUploader { checkPath(dest); String key = context.pathToKey(dest); return context.submit(new CompletableFuture<>(), - () -> { + trackDurationOfCallable(statistics, OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(), () -> { String uploadId = writeOperations.initiateMultiPartUpload(key, PutObjectOptions.keepingDirs()); statistics.uploadStarted(); return BBUploadHandle.from(ByteBuffer.wrap( uploadId.getBytes(Charsets.UTF_8))); - }); + })); } @Override @@ -152,7 +155,7 @@ class S3AMultipartUploader extends AbstractMultipartUploader { UploadPartRequest request = writeOperations.newUploadPartRequest(key, uploadIdString, partNumber, (int) lengthInBytes, inputStream, null, 0L); - UploadPartResult result = writeOperations.uploadPart(request); + UploadPartResult result = writeOperations.uploadPart(request, statistics); statistics.partPut(lengthInBytes); String eTag = result.getETag(); return BBPartHandle.from( @@ -206,7 +209,7 @@ class S3AMultipartUploader extends AbstractMultipartUploader { // retrieve/create operation state for scalability of completion. long finalLen = totalLength; return context.submit(new CompletableFuture<>(), - () -> { + trackDurationOfCallable(statistics, MULTIPART_UPLOAD_COMPLETED.getSymbol(), () -> { CompleteMultipartUploadResult result = writeOperations.commitUpload( key, @@ -218,7 +221,7 @@ class S3AMultipartUploader extends AbstractMultipartUploader { byte[] eTag = result.getETag().getBytes(Charsets.UTF_8); statistics.uploadCompleted(); return (PathHandle) () -> ByteBuffer.wrap(eTag); - }); + })); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/S3AMultipartUploaderStatisticsImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/S3AMultipartUploaderStatisticsImpl.java index 7b6d559cf20..b35e4421d72 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/S3AMultipartUploaderStatisticsImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/S3AMultipartUploaderStatisticsImpl.java @@ -34,6 +34,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_INSTANTIATED; import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT; import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT_BYTES; import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_STARTED; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_INITIATED; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; /** @@ -73,8 +74,11 @@ public final class S3AMultipartUploaderStatisticsImpl MULTIPART_UPLOAD_PART_PUT_BYTES.getSymbol(), MULTIPART_UPLOAD_ABORTED.getSymbol(), MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED.getSymbol(), - MULTIPART_UPLOAD_COMPLETED.getSymbol(), MULTIPART_UPLOAD_STARTED.getSymbol()) + .withDurationTracking( + MULTIPART_UPLOAD_COMPLETED.getSymbol(), + OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(), + MULTIPART_UPLOAD_PART_PUT.getSymbol()) .build(); setIOStatistics(st); } @@ -96,13 +100,12 @@ public final class S3AMultipartUploaderStatisticsImpl @Override public void partPut(final long lengthInBytes) { - inc(MULTIPART_UPLOAD_PART_PUT, 1); inc(MULTIPART_UPLOAD_PART_PUT_BYTES, lengthInBytes); } @Override public void uploadCompleted() { - inc(MULTIPART_UPLOAD_COMPLETED, 1); + // duration tracking updates the statistics } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java index 2d29282ad01..1a944ec2996 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java @@ -114,7 +114,7 @@ public class ITestS3AMiscOperations extends AbstractS3ATestBase { new ByteArrayInputStream("PUT".getBytes()), metadata); LambdaTestUtils.intercept(IllegalStateException.class, - () -> fs.putObjectDirect(put, PutObjectOptions.keepingDirs())); + () -> fs.putObjectDirect(put, PutObjectOptions.keepingDirs(), null)); assertPathDoesNotExist("put object was created", path); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java index 9ea33cf69c9..1ddff3c4cd5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java @@ -82,7 +82,7 @@ public final class MultipartTestUtils { String uploadId = writeHelper.initiateMultiPartUpload(key, PutObjectOptions.keepingDirs()); UploadPartRequest req = writeHelper.newUploadPartRequest(key, uploadId, partNo, len, in, null, 0L); - PartETag partEtag = writeHelper.uploadPart(req).getPartETag(); + PartETag partEtag = writeHelper.uploadPart(req, null).getPartETag(); LOG.debug("uploaded part etag {}, upid {}", partEtag.getETag(), uploadId); return new IdKey(key, uploadId); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditIntegration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditIntegration.java index bd552b91aad..7cdab4c4b75 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditIntegration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditIntegration.java @@ -25,13 +25,17 @@ import java.util.List; import com.amazonaws.DefaultRequest; import com.amazonaws.handlers.RequestHandler2; import com.amazonaws.services.s3.model.GetObjectMetadataRequest; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3ARetryPolicy; import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException; import org.apache.hadoop.fs.s3a.audit.impl.NoopAuditor; import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.service.Service; import org.apache.hadoop.test.AbstractHadoopTestBase; @@ -67,6 +71,22 @@ public class TestAuditIntegration extends AbstractHadoopTestBase { }); } + /** + * UnsupportedRequest mapping and fail fast outcome. + */ + @Test + public void testUnsupportedExceptionTranslation() throws Throwable { + final UnsupportedRequestException ex = intercept(UnsupportedRequestException.class, () -> { + throw translateException("test", "/", + new AuditOperationRejectedException("not supported")); + }); + final S3ARetryPolicy retryPolicy = new S3ARetryPolicy(new Configuration(false)); + final RetryPolicy.RetryAction action = retryPolicy.shouldRetry(ex, 0, 0, true); + Assertions.assertThat(action.action) + .describedAs("retry policy %s for %s", action, ex) + .isEqualTo(RetryPolicy.RetryAction.RetryDecision.FAIL); + } + /** * Create a no-op auditor. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java index aeea195aacf..e1440a497b0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.commit.magic; import java.io.IOException; import java.util.List; +import java.util.Map; import org.assertj.core.api.Assertions; import org.slf4j.Logger; @@ -42,6 +43,7 @@ import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.extractXAttrLongValue; /** @@ -67,6 +69,8 @@ public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles { /** The file with the JSON data about the commit. */ private Path pendingDataFile; + private Path finalDirectory; + /** * Use fast upload on disk. * @return the upload buffer mechanism. @@ -84,13 +88,18 @@ public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles { return "ITestS3AHugeMagicCommits"; } + @Override + protected boolean expectImmediateFileVisibility() { + return false; + } + @Override public void setup() throws Exception { super.setup(); CommitUtils.verifyIsMagicCommitFS(getFileSystem()); // set up the paths for the commit operation - Path finalDirectory = new Path(getScaleTestDir(), "commit"); + finalDirectory = new Path(getScaleTestDir(), "commit"); magicDir = new Path(finalDirectory, MAGIC); jobDir = new Path(magicDir, "job_001"); String filename = "commit.bin"; @@ -120,6 +129,15 @@ public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles { FileStatus status = fs.getFileStatus(magicOutputFile); assertEquals("Non empty marker file " + status, 0, status.getLen()); + final Map xAttr = fs.getXAttrs(magicOutputFile); + final String header = XA_MAGIC_MARKER; + Assertions.assertThat(xAttr) + .describedAs("Header %s of %s", header, magicOutputFile) + .containsKey(header); + Assertions.assertThat(extractXAttrLongValue(xAttr.get(header))) + .describedAs("Decoded header %s of %s", header, magicOutputFile) + .get() + .isEqualTo(getFilesize()); ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); CommitOperations operations = new CommitOperations(fs); Path destDir = getHugefile().getParent(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index f8d47011de3..c4949375b76 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.IntFunction; @@ -51,17 +50,29 @@ import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.Progressable; +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_COMPLETED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; -import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS; /** * Scale test which creates a huge file. @@ -76,9 +87,10 @@ import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSo */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { + private static final Logger LOG = LoggerFactory.getLogger( AbstractSTestS3AHugeFiles.class); - public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB; + public static final int DEFAULT_UPLOAD_BLOCKSIZE = 128 * _1KB; private Path scaleTestDir; private Path hugefile; @@ -94,6 +106,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { scaleTestDir = new Path(getTestPath(), getTestSuiteName()); hugefile = new Path(scaleTestDir, "hugefile"); hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed"); + uploadBlockSize = uploadBlockSize(); filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE, DEFAULT_HUGE_FILESIZE); } @@ -117,12 +130,22 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { partitionSize = (int) getTestPropertyBytes(conf, KEY_HUGE_PARTITION_SIZE, DEFAULT_HUGE_PARTITION_SIZE); - assertTrue("Partition size too small: " + partitionSize, - partitionSize >= MULTIPART_MIN_SIZE); + Assertions.assertThat(partitionSize) + .describedAs("Partition size set in " + KEY_HUGE_PARTITION_SIZE) + .isGreaterThanOrEqualTo(MULTIPART_MIN_SIZE); + removeBaseAndBucketOverrides(conf, + SOCKET_SEND_BUFFER, + SOCKET_RECV_BUFFER, + MIN_MULTIPART_THRESHOLD, + MULTIPART_SIZE, + USER_AGENT_PREFIX, + FAST_UPLOAD_BUFFER); + conf.setLong(SOCKET_SEND_BUFFER, _1MB); conf.setLong(SOCKET_RECV_BUFFER, _1MB); conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize); conf.setInt(MULTIPART_SIZE, partitionSize); + conf.setInt(AWS_S3_VECTOR_ACTIVE_RANGE_READS, 32); conf.set(USER_AGENT_PREFIX, "STestS3AHugeFileCreate"); conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); S3ATestUtils.disableFilesystemCaching(conf); @@ -180,6 +203,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { IOStatistics iostats = fs.getIOStatistics(); String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol(); + String multipartBlockUploads = Statistic.MULTIPART_UPLOAD_PART_PUT.getSymbol(); String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol(); Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE; Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING; @@ -192,13 +216,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { true, uploadBlockSize, progress)) { - try { - streamStatistics = getOutputStreamStatistics(out); - } catch (ClassCastException e) { - LOG.info("Wrapped output stream is not block stream: {}", - out.getWrappedStream()); - streamStatistics = null; - } + streamStatistics = requireNonNull(getOutputStreamStatistics(out), + () -> "No iostatistics in " + out); for (long block = 1; block <= blocks; block++) { out.write(data); @@ -222,6 +241,13 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { writtenMB / elapsedTime)); } } + if (!expectMultipartUpload()) { + // it is required that no data has uploaded at this point on a + // non-multipart upload + Assertions.assertThat(progress.getUploadEvents()) + .describedAs("upload events in %s", progress) + .isEqualTo(0); + } // now close the file LOG.info("Closing stream {}", out); LOG.info("Statistics : {}", streamStatistics); @@ -235,34 +261,51 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { filesizeMB, uploadBlockSize); logFSState(); bandwidth(timer, filesize); - LOG.info("Statistics after stream closed: {}", streamStatistics); - LOG.info("IOStatistics after upload: {}", - demandStringifyIOStatistics(iostats)); - long putRequestCount = lookupCounterStatistic(iostats, putRequests); + final IOStatistics streamIOstats = streamStatistics.getIOStatistics(); + LOG.info("Stream IOStatistics after stream closed: {}", + ioStatisticsToPrettyString(streamIOstats)); + + LOG.info("FileSystem IOStatistics after upload: {}", + ioStatisticsToPrettyString(iostats)); + final String requestKey; long putByteCount = lookupCounterStatistic(iostats, putBytes); - Assertions.assertThat(putRequestCount) - .describedAs("Put request count from filesystem stats %s", - iostats) - .isGreaterThan(0); + long putRequestCount; + + if (expectMultipartUpload()) { + requestKey = multipartBlockUploads; + putRequestCount = lookupCounterStatistic(streamIOstats, requestKey); + assertThatStatisticCounter(streamIOstats, multipartBlockUploads) + .isGreaterThanOrEqualTo(1); + verifyStatisticCounterValue(streamIOstats, STREAM_WRITE_BLOCK_UPLOADS, putRequestCount); + // non-magic uploads will have completed + verifyStatisticCounterValue(streamIOstats, MULTIPART_UPLOAD_COMPLETED.getSymbol(), + expectImmediateFileVisibility() ? 1 : 0); + } else { + // single put + requestKey = putRequests; + putRequestCount = lookupCounterStatistic(streamIOstats, requestKey); + verifyStatisticCounterValue(streamIOstats, putRequests, 1); + verifyStatisticCounterValue(streamIOstats, STREAM_WRITE_BLOCK_UPLOADS, 1); + verifyStatisticCounterValue(streamIOstats, MULTIPART_UPLOAD_COMPLETED.getSymbol(), 0); + } Assertions.assertThat(putByteCount) - .describedAs("%s count from filesystem stats %s", - putBytes, iostats) + .describedAs("%s count from stream stats %s", + putBytes, streamStatistics) .isGreaterThan(0); + LOG.info("PUT {} bytes in {} operations; {} MB/operation", putByteCount, putRequestCount, putByteCount / (putRequestCount * _1MB)); LOG.info("Time per PUT {} nS", toHuman(timer.nanosPerOperation(putRequestCount))); verifyStatisticGaugeValue(iostats, putRequestsActive.getSymbol(), 0); - verifyStatisticGaugeValue(iostats, - STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0); + verifyStatisticGaugeValue(iostats, STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0); + progress.verifyNoFailures( "Put file " + fileToCreate + " of size " + filesize); - if (streamStatistics != null) { - assertEquals("actively allocated blocks in " + streamStatistics, - 0, streamStatistics.getBlocksActivelyAllocated()); - } + assertEquals("actively allocated blocks in " + streamStatistics, + 0, streamStatistics.getBlocksActivelyAllocated()); } /** @@ -290,10 +333,45 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { return hugefileRenamed; } - protected int getUploadBlockSize() { + public int getUploadBlockSize() { return uploadBlockSize; } + /** + * Get the desired upload block size for this test run. + * @return the block size + */ + protected int uploadBlockSize() { + return DEFAULT_UPLOAD_BLOCKSIZE; + } + + /** + * Get the size of the file. + * @return file size + */ + public long getFilesize() { + return filesize; + } + + /** + * Is this expected to be a multipart upload? + * Assertions will change if not. + * @return true by default. + */ + protected boolean expectMultipartUpload() { + return true; + } + + /** + * Is this expected to be a normal file creation with + * the output immediately visible? + * Assertions will change if not. + * @return true by default. + */ + protected boolean expectImmediateFileVisibility() { + return true; + } + protected int getPartitionSize() { return partitionSize; } @@ -304,6 +382,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { private final class ProgressCallback implements Progressable, ProgressListener { private AtomicLong bytesTransferred = new AtomicLong(0); + private AtomicLong uploadEvents = new AtomicLong(0); private AtomicInteger failures = new AtomicInteger(0); private final ContractTestUtils.NanoTimer timer; @@ -339,10 +418,11 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { progressEvent, writtenMB, elapsedTimeS, writtenMB / elapsedTimeS)); break; + case REQUEST_BYTE_TRANSFER_EVENT: + uploadEvents.incrementAndGet(); + break; default: - if (eventType.isByteCountEvent()) { - LOG.debug("Event {}", progressEvent); - } else { + if (!eventType.isByteCountEvent()) { LOG.info("Event {}", progressEvent); } break; @@ -352,12 +432,29 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { @Override public String toString() { String sb = "ProgressCallback{" - + "bytesTransferred=" + bytesTransferred + - ", failures=" + failures + + + "bytesTransferred=" + bytesTransferred.get() + + ", uploadEvents=" + uploadEvents.get() + + ", failures=" + failures.get() + '}'; return sb; } + /** + * Get the number of bytes transferred. + * @return byte count + */ + private long getBytesTransferred() { + return bytesTransferred.get(); + } + + /** + * Get the number of event callbacks. + * @return count of byte transferred events. + */ + private long getUploadEvents() { + return uploadEvents.get(); + } + private void verifyNoFailures(String operation) { assertEquals("Failures in " + operation + ": " + this, 0, failures.get()); } @@ -467,15 +564,42 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { rangeList.add(FileRange.createFileRange(2820861, 156770)); IntFunction allocate = ByteBuffer::allocate; FileSystem fs = getFileSystem(); - CompletableFuture builder = - fs.openFile(hugefile).build(); - try (FSDataInputStream in = builder.get()) { - in.readVectored(rangeList, allocate); - byte[] readFullRes = new byte[(int)filesize]; + + // read into a buffer first + // using sequential IO + + int validateSize = (int) Math.min(filesize, 10 * _1MB); + byte[] readFullRes; + IOStatistics sequentialIOStats, vectorIOStats; + try (FSDataInputStream in = fs.openFile(hugefile) + .opt(FS_OPTION_OPENFILE_LENGTH, validateSize) // lets us actually force a shorter read + .opt(FS_OPTION_OPENFILE_SPLIT_START, 0) + .opt(FS_OPTION_OPENFILE_SPLIT_END, validateSize) + .opt(FS_OPTION_OPENFILE_READ_POLICY, "sequential") + .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize) + .build().get(); + DurationInfo ignored = new DurationInfo(LOG, "Sequential read of %,d bytes", + validateSize)) { + readFullRes = new byte[validateSize]; in.readFully(0, readFullRes); + sequentialIOStats = in.getIOStatistics(); + } + + // now do a vector IO read + try (FSDataInputStream in = fs.openFile(hugefile) + .opt(FS_OPTION_OPENFILE_LENGTH, filesize) + .opt(FS_OPTION_OPENFILE_READ_POLICY, "vector, random") + .build().get(); + DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) { + + in.readVectored(rangeList, allocate); // Comparing vectored read results with read fully. validateVectoredReadResult(rangeList, readFullRes); + vectorIOStats = in.getIOStatistics(); } + + LOG.info("Bulk read IOStatistics={}", ioStatisticsToPrettyString(sequentialIOStats)); + LOG.info("Vector IOStatistics={}", ioStatisticsToPrettyString(vectorIOStats)); } /** @@ -493,7 +617,12 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { byte[] data = new byte[uploadBlockSize]; ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) { + try (FSDataInputStream in = fs.openFile(hugefile) + .withFileStatus(status) + .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize) + .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .build().get(); + DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) { for (long block = 0; block < blocks; block++) { in.readFully(data); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java index 91ea0c8e62f..de903b3d75a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java @@ -260,7 +260,7 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase { .newPutObjectRequest(fs.pathToKey(file), om, null, new FailingInputStream()); futures.add(submit(executorService, () -> - writeOperationHelper.putObject(put, PutObjectOptions.keepingDirs()))); + writeOperationHelper.putObject(put, PutObjectOptions.keepingDirs(), null))); } LOG.info("Waiting for PUTs to complete"); waitForCompletion(futures); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java deleted file mode 100644 index 08192969e2d..00000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.scale; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.Test; - -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.apache.hadoop.fs.s3a.Constants; - -import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; -import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER; -import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK; -import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; -import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; -import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; -import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS; -import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; - -/** - * Test a file upload using a single PUT operation. Multipart uploads will - * be disabled in the test. - */ -public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase { - - public static final Logger LOG = LoggerFactory.getLogger( - ITestS3AHugeFileUploadSinglePut.class); - - private long fileSize; - - @Override - protected Configuration createScaleConfiguration() { - Configuration conf = super.createScaleConfiguration(); - removeBucketOverrides(getTestBucketName(conf), conf, - FAST_UPLOAD_BUFFER, - IO_CHUNK_BUFFER_SIZE, - KEY_HUGE_FILESIZE, - MULTIPART_UPLOADS_ENABLED, - MULTIPART_SIZE, - REQUEST_TIMEOUT); - conf.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false); - fileSize = getTestPropertyBytes(conf, KEY_HUGE_FILESIZE, - DEFAULT_HUGE_FILESIZE); - // set a small part size to verify it does not impact block allocation size - conf.setLong(MULTIPART_SIZE, 10_000); - conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK); - conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360); - conf.set(REQUEST_TIMEOUT, "1h"); - return conf; - } - - @Test - public void uploadFileSinglePut() throws IOException { - LOG.info("Creating file with size : {}", fileSize); - S3AFileSystem fs = getFileSystem(); - ContractTestUtils.createAndVerifyFile(fs, - methodPath(), fileSize); - // Exactly three put requests should be made during the upload of the file - // First one being the creation of the directory marker - // Second being the creation of the test file - // Third being the creation of directory marker on the file delete - assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol()) - .isEqualTo(3); - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java new file mode 100644 index 00000000000..ed300dba01e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Use a single PUT for the whole upload/rename/delete workflow; include verification + * that the transfer manager will fail fast unless the multipart threshold is huge. + */ +public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles { + + /** + * Size to ensure MPUs don't happen in transfer manager. + */ + public static final String S_1T = "1T"; + + public static final String SINGLE_PUT_REQUEST_TIMEOUT = "1h"; + + /** + * Always use disk storage. + * @return disk block store always. + */ + protected String getBlockOutputBufferName() { + return Constants.FAST_UPLOAD_BUFFER_DISK; + } + + @Override + protected boolean expectMultipartUpload() { + return false; + } + + /** + * Create a configuration without multipart upload, + * and a long request timeout to allow for a very slow + * PUT in close. + * @return the configuration to create the test FS with. + */ + @Override + protected Configuration createScaleConfiguration() { + Configuration conf = super.createScaleConfiguration(); + removeBaseAndBucketOverrides(conf, + IO_CHUNK_BUFFER_SIZE, + MIN_MULTIPART_THRESHOLD, + MULTIPART_UPLOADS_ENABLED, + MULTIPART_SIZE, + REQUEST_TIMEOUT); + conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360); + conf.set(MIN_MULTIPART_THRESHOLD, S_1T); + conf.set(MULTIPART_SIZE, S_1T); + conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); + conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT); + return conf; + } + + /** + * After the file is created, attempt a rename with an FS + * instance with a small multipart threshold; + * this MUST be rejected. + */ + @Override + public void test_030_postCreationAssertions() throws Throwable { + assumeHugeFileExists(); + final Path hugefile = getHugefile(); + final Path hugefileRenamed = getHugefileRenamed(); + describe("renaming %s to %s", hugefile, hugefileRenamed); + S3AFileSystem fs = getFileSystem(); + fs.delete(hugefileRenamed, false); + // create a new fs with a small multipart threshold; expect rename failure. + final Configuration conf = new Configuration(fs.getConf()); + conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); + conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); + S3ATestUtils.disableFilesystemCaching(conf); + + try (FileSystem fs2 = FileSystem.get(fs.getUri(), conf)) { + intercept(UnsupportedRequestException.class, () -> + fs2.rename(hugefile, hugefileRenamed)); + } + } +}