diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index e5369b84883..2e4f1e8361f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -429,6 +429,57 @@ public final class Constants { */ public static final String CONTENT_ENCODING = "fs.s3a.object.content.encoding"; + /** + * S3 storage class: standard, reduced_redundancy, intelligent_tiering etc. + * Value {@value }. + */ + public static final String STORAGE_CLASS = "fs.s3a.create.storage.class"; + + /** + * S3 Storage option: {@value}. + */ + public static final String STORAGE_CLASS_STANDARD = "standard"; + + /** + * S3 Storage option: {@value}. + */ + public static final String STORAGE_CLASS_REDUCED_REDUNDANCY = "reduced_redundancy"; + + /** + * S3 Storage option: {@value}. + */ + public static final String STORAGE_CLASS_GLACIER = "glacier"; + + /** + * S3 Storage option: {@value}. + */ + public static final String STORAGE_CLASS_STANDARD_INFREQUENT_ACCESS = "standard_ia"; + + /** + * S3 Storage option: {@value}. + */ + public static final String STORAGE_CLASS_ONEZONE_INFREQUENT_ACCESS = "onezone_ia"; + + /** + * S3 Storage option: {@value}. + */ + public static final String STORAGE_CLASS_INTELLIGENT_TIERING = "intelligent_tiering"; + + /** + * S3 Storage option: {@value}. + */ + public static final String STORAGE_CLASS_DEEP_ARCHIVE = "deep_archive"; + + /** + * S3 Storage option: {@value}. + */ + public static final String STORAGE_CLASS_OUTPOSTS = "outposts"; + + /** + * S3 Storage option: {@value}. + */ + public static final String STORAGE_CLASS_GLACIER_INSTANT_RETRIEVAL = "glacier_ir"; + // should we try to purge old multipart uploads when starting up public static final String PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a7d5af35e5f..7eec63dc121 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -66,6 +67,7 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.StorageClass; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.Copy; @@ -963,6 +965,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, // Any encoding type String contentEncoding = getConf().getTrimmed(CONTENT_ENCODING, null); + String storageClassConf = getConf() + .getTrimmed(STORAGE_CLASS, "") + .toUpperCase(Locale.US); + StorageClass storageClass; + try { + storageClass = StorageClass.fromValue(storageClassConf); + } catch (IllegalArgumentException e) { + LOG.warn("Unknown storage class property {}: {}; falling back to default storage class", + STORAGE_CLASS, storageClassConf); + storageClass = null; + } + return RequestFactoryImpl.builder() .withBucket(requireNonNull(bucket)) .withCannedACL(getCannedACL()) @@ -970,6 +984,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, .withMultipartPartCountLimit(partCountLimit) .withRequestPreparer(getAuditManager()::requestCreated) .withContentEncoding(contentEncoding) + .withStorageClass(storageClass) .build(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index 97a15d95132..20cd27225a8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -44,6 +44,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; import com.amazonaws.services.s3.model.SSECustomerKey; import com.amazonaws.services.s3.model.SelectObjectContentRequest; +import com.amazonaws.services.s3.model.StorageClass; import com.amazonaws.services.s3.model.UploadPartRequest; import org.apache.hadoop.fs.PathIOException; @@ -106,6 +107,12 @@ public interface RequestFactory { */ String getContentEncoding(); + /** + * Get the object storage class, return null if none. + * @return storage class + */ + StorageClass getStorageClass(); + /** * Create a new object metadata instance. * Any standard metadata headers are added here, for example: diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 95bdc52884f..925d3566ddd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -46,6 +46,7 @@ import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; import com.amazonaws.services.s3.model.SSECustomerKey; import com.amazonaws.services.s3.model.SelectObjectContentRequest; +import com.amazonaws.services.s3.model.StorageClass; import com.amazonaws.services.s3.model.UploadPartRequest; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -116,6 +117,11 @@ public class RequestFactoryImpl implements RequestFactory { */ private final String contentEncoding; + /** + * Storage class. + */ + private final StorageClass storageClass; + /** * Constructor. * @param builder builder with all the configuration. @@ -128,6 +134,7 @@ public class RequestFactoryImpl implements RequestFactory { this.multipartPartCountLimit = builder.multipartPartCountLimit; this.requestPreparer = builder.requestPreparer; this.contentEncoding = builder.contentEncoding; + this.storageClass = builder.storageClass; } /** @@ -200,6 +207,15 @@ public class RequestFactoryImpl implements RequestFactory { return contentEncoding; } + /** + * Get the object storage class, return null if none. + * @return storage class + */ + @Override + public StorageClass getStorageClass() { + return storageClass; + } + /** * Sets server side encryption parameters to the part upload * request when encryption is enabled. @@ -343,7 +359,7 @@ public class RequestFactoryImpl implements RequestFactory { } /** * Create a putObject request. - * Adds the ACL and metadata + * Adds the ACL, storage class and metadata * @param key key of object * @param metadata metadata header * @param srcfile source file @@ -357,6 +373,9 @@ public class RequestFactoryImpl implements RequestFactory { srcfile); setOptionalPutRequestParameters(putObjectRequest); putObjectRequest.setCannedAcl(cannedACL); + if (storageClass != null) { + putObjectRequest.setStorageClass(storageClass); + } putObjectRequest.setMetadata(metadata); return prepareRequest(putObjectRequest); } @@ -431,6 +450,9 @@ public class RequestFactoryImpl implements RequestFactory { destKey, newObjectMetadata(-1)); initiateMPURequest.setCannedACL(getCannedACL()); + if (getStorageClass() != null) { + initiateMPURequest.withStorageClass(getStorageClass()); + } setOptionalMultipartUploadRequestParameters(initiateMPURequest); return prepareRequest(initiateMPURequest); } @@ -610,6 +632,11 @@ public class RequestFactoryImpl implements RequestFactory { /** Content Encoding. */ private String contentEncoding; + /** + * Storage class. + */ + private StorageClass storageClass; + /** * Multipart limit. */ @@ -641,6 +668,16 @@ public class RequestFactoryImpl implements RequestFactory { return this; } + /** + * Storage class. + * @param value new value + * @return the builder + */ + public RequestFactoryBuilder withStorageClass(final StorageClass value) { + storageClass = value; + return this; + } + /** * Target bucket. * @param value new value diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 12cfcf984da..049087f0af9 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1076,6 +1076,17 @@ options are covered in [Testing](./testing.md). + + fs.s3a.create.storage.class + + + Storage class: standard, reduced_redundancy, intelligent_tiering, etc. + Specify the storage class for S3A PUT object requests. + If not set the storage class will be null + and mapped to default standard class on S3. + + + ``` ## Retry and Recovery @@ -1647,6 +1658,26 @@ To enable this feature within S3A, configure the `fs.s3a.requester.pays.enabled` ``` +## Storage Classes + +Amazon S3 offers a range of [Storage Classes](https://aws.amazon.com/s3/storage-classes/) +that you can choose from based on behavior of your applications. By using the right +storage class, you can reduce the cost of your bucket. + +S3A uses Standard storage class for PUT object requests by default, which is suitable for +general use cases. To use a specific storage class, set the value in `fs.s3a.create.storage.class` property to +the storage class you want. + +```xml + + fs.s3a.create.storage.class + intelligent_tiering + +``` + +Please note that S3A does not support reading from archive storage classes at the moment. +`AccessDeniedException` with InvalidObjectState will be thrown if you're trying to do so. + ## How S3A writes data to S3 The original S3A client implemented file writes by diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md index e1761538f21..44525c8ccd9 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md @@ -572,6 +572,18 @@ can be turned off. Encryption is only used for those specific test suites with `Encryption` in their classname. +### Disabling the storage class tests + +When running storage class tests against third party object store that doesn't support +S3 storage class, these tests might fail. They can be disabled. + +```xml + + test.fs.s3a.create.storage.class.enabled + false + +``` + ### Configuring the CSV file read tests** To test on alternate infrastructures supporting diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md index 96e6e287dea..903310a94bf 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -561,24 +561,41 @@ Error Code: 403 Forbidden; Request ID: myshortreqid; S3 Extended Request ID: myl To enable requester pays, set `fs.s3a.requester.pays.enabled` property to `true`. +### `AccessDeniedException` "InvalidObjectState" when trying to read files + +``` +java.nio.file.AccessDeniedException: file1: copyFile(file1, file2) on file1: com.amazonaws.services.s3.model.AmazonS3Exception: Operation is not valid for the source object's storage class (Service: Amazon S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: SK9EMPC1YRX75VZR; S3 Extended Request ID: /nhUfdwJ+y5DLz6B4YR2FdA0FnQWwhDAkSCakn42zs2JssK3qWTrfwdNDiy6bOyXHOvJY0VAlHw=; Proxy: null), S3 Extended Request ID: /nhUfdwJ+y5DLz6B4YR2FdA0FnQWwhDAkSCakn42zs2JssK3qWTrfwdNDiy6bOyXHOvJY0VAlHw=:InvalidObjectState + +Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Operation is not valid for the source object's storage class (Service: Amazon S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: SK9EMPC1YRX75VZR; S3 Extended Request ID: /nhUfdwJ+y5DLz6B4YR2FdA0FnQWwhDAkSCakn42zs2JssK3qWTrfwdNDiy6bOyXHOvJY0VAlHw=; Proxy: null), S3 Extended Request ID: /nhUfdwJ+y5DLz6B4YR2FdA0FnQWwhDAkSCakn42zs2JssK3qWTrfwdNDiy6bOyXHOvJY0VAlHw= +``` + +This happens when you're trying to read or copy files that have archive storage class such as +Glacier. + +If you want to access the file with S3A after writes, do not set `fs.s3a.create.storage.class` to `glacier` or `deep_archive`. + ### "Unable to find a region via the region provider chain." when using session credentials. -Region must be provided when requesting session credentials, or an exception will be thrown with the message: +Region must be provided when requesting session credentials, or an exception will be thrown with the +message: + ``` com.amazonaws.SdkClientException: Unable to find a region via the region provider chain. Must provide an explicit region in the builder or setup environment to supply a region. ``` -In this case you have to set the `fs.s3a.assumed.role.sts.endpoint` property to a valid -S3 sts endpoint and region like the following: + +In this case you have to set the `fs.s3a.assumed.role.sts.endpoint` property to a valid S3 sts +endpoint and region like the following: ```xml + fs.s3a.assumed.role.sts.endpoint ${sts.endpoint} - fs.s3a.assumed.role.sts.endpoint.region - ${sts.region} +fs.s3a.assumed.role.sts.endpoint.region +${sts.region} ``` diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AStorageClass.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AStorageClass.java new file mode 100644 index 00000000000..e141ef5aa32 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AStorageClass.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.nio.file.AccessDeniedException; +import java.util.Map; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.contract.s3a.S3AContract; + +import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS; +import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_GLACIER; +import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_REDUCED_REDUNDANCY; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfStorageClassTestsDisabled; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_STORAGE_CLASS; +import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.decodeBytes; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Tests of storage class. + */ +public class ITestS3AStorageClass extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + disableFilesystemCaching(conf); + removeBaseAndBucketOverrides(conf, STORAGE_CLASS); + + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + skipIfStorageClassTestsDisabled(getConfiguration()); + } + + /* + * This test ensures the default storage class configuration (no config or null) + * works well with create and copy operations + */ + @Test + public void testCreateAndCopyObjectWithStorageClassDefault() throws Throwable { + Configuration conf = this.createConfiguration(); + S3AContract contract = (S3AContract) createContract(conf); + contract.init(); + + FileSystem fs = contract.getTestFileSystem(); + Path dir = methodPath(); + fs.mkdirs(dir); + assertObjectHasNoStorageClass(dir); + Path path = new Path(dir, "file1"); + ContractTestUtils.touch(fs, path); + assertObjectHasNoStorageClass(path); + Path path2 = new Path(dir, "file1"); + fs.rename(path, path2); + assertObjectHasNoStorageClass(path2); + } + + /* + * Verify object can be created and copied correctly + * with specified storage class + */ + @Test + public void testCreateAndCopyObjectWithStorageClassReducedRedundancy() throws Throwable { + Configuration conf = this.createConfiguration(); + conf.set(STORAGE_CLASS, STORAGE_CLASS_REDUCED_REDUNDANCY); + S3AContract contract = (S3AContract) createContract(conf); + contract.init(); + + FileSystem fs = contract.getTestFileSystem(); + Path dir = methodPath(); + fs.mkdirs(dir); + // even with storage class specified + // directories do not have storage class + assertObjectHasNoStorageClass(dir); + Path path = new Path(dir, "file1"); + ContractTestUtils.touch(fs, path); + assertObjectHasStorageClass(path, STORAGE_CLASS_REDUCED_REDUNDANCY); + Path path2 = new Path(dir, "file1"); + fs.rename(path, path2); + assertObjectHasStorageClass(path2, STORAGE_CLASS_REDUCED_REDUNDANCY); + } + + /* + * Archive storage classes have different behavior + * from general storage classes + */ + @Test + public void testCreateAndCopyObjectWithStorageClassGlacier() throws Throwable { + Configuration conf = this.createConfiguration(); + conf.set(STORAGE_CLASS, STORAGE_CLASS_GLACIER); + S3AContract contract = (S3AContract) createContract(conf); + contract.init(); + + FileSystem fs = contract.getTestFileSystem(); + Path dir = methodPath(); + fs.mkdirs(dir); + // even with storage class specified + // directories do not have storage class + assertObjectHasNoStorageClass(dir); + Path path = new Path(dir, "file1"); + ContractTestUtils.touch(fs, path); + assertObjectHasStorageClass(path, STORAGE_CLASS_GLACIER); + Path path2 = new Path(dir, "file2"); + + // this is the current behavior + // object with archive storage class can't be read directly + // when trying to read it, AccessDeniedException will be thrown + // with message InvalidObjectState + intercept(AccessDeniedException.class, "InvalidObjectState", () -> fs.rename(path, path2)); + } + + /* + * Verify object can be created and copied correctly + * with completely invalid storage class + */ + @Test + public void testCreateAndCopyObjectWithStorageClassInvalid() throws Throwable { + Configuration conf = this.createConfiguration(); + conf.set(STORAGE_CLASS, "testing"); + S3AContract contract = (S3AContract) createContract(conf); + contract.init(); + + FileSystem fs = contract.getTestFileSystem(); + Path dir = methodPath(); + fs.mkdirs(dir); + // even with storage class specified + // directories do not have storage class + assertObjectHasNoStorageClass(dir); + Path path = new Path(dir, "file1"); + ContractTestUtils.touch(fs, path); + assertObjectHasNoStorageClass(path); + Path path2 = new Path(dir, "file1"); + fs.rename(path, path2); + assertObjectHasNoStorageClass(path2); + } + + /* + * Verify object can be created and copied correctly + * with empty string configuration + */ + @Test + public void testCreateAndCopyObjectWithStorageClassEmpty() throws Throwable { + Configuration conf = this.createConfiguration(); + conf.set(STORAGE_CLASS, ""); + S3AContract contract = (S3AContract) createContract(conf); + contract.init(); + + FileSystem fs = contract.getTestFileSystem(); + Path dir = methodPath(); + fs.mkdirs(dir); + // even with storage class specified + // directories do not have storage class + assertObjectHasNoStorageClass(dir); + Path path = new Path(dir, "file1"); + ContractTestUtils.touch(fs, path); + assertObjectHasNoStorageClass(path); + Path path2 = new Path(dir, "file1"); + fs.rename(path, path2); + assertObjectHasNoStorageClass(path2); + } + + /** + * Assert that a given object has no storage class specified. + * + * @param path path + */ + protected void assertObjectHasNoStorageClass(Path path) throws Throwable { + S3AFileSystem fs = getFileSystem(); + Map xAttrs = fs.getXAttrs(path); + String storageClass = decodeBytes(xAttrs.get(XA_STORAGE_CLASS)); + + Assertions.assertThat(storageClass).describedAs("Storage class of object %s", path).isNull(); + } + + /** + * Assert that a given object has the given storage class specified. + * + * @param path path + * @param expectedStorageClass expected storage class for the object + */ + protected void assertObjectHasStorageClass(Path path, String expectedStorageClass) + throws Throwable { + S3AFileSystem fs = getFileSystem(); + Map xAttrs = fs.getXAttrs(path); + String actualStorageClass = decodeBytes(xAttrs.get(XA_STORAGE_CLASS)); + + Assertions.assertThat(actualStorageClass).describedAs("Storage class of object %s", path) + .isEqualToIgnoringCase(expectedStorageClass); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java index 6a74338cdeb..47ff2f326ef 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java @@ -53,6 +53,11 @@ public interface S3ATestConstants { */ String KEY_ENCRYPTION_TESTS = TEST_FS_S3A + "encryption.enabled"; + /** + * A property set to true if storage class tests are enabled: {@value }. + */ + String KEY_STORAGE_CLASS_TESTS_ENABLED = TEST_FS_S3A + "create.storage.class.enabled"; + /** * Tell tests that they are being executed in parallel: {@value}. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index e20de5edf06..21ad7f87d6e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -465,6 +465,17 @@ public final class S3ATestUtils { } } + /** + * Skip a test if storage class tests are disabled. + * @param configuration configuration to probe + */ + public static void skipIfStorageClassTestsDisabled( + Configuration configuration) { + if (!configuration.getBoolean(KEY_STORAGE_CLASS_TESTS_ENABLED, true)) { + skip("Skipping storage class tests"); + } + } + /** * Create a test path, using the value of * {@link S3ATestConstants#TEST_UNIQUE_FORK_ID} if it is set. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index 4cb90a7373c..bf99d495769 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -71,6 +71,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS; +import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_INTELLIGENT_TIERING; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.E_SELF_GENERATED_JOB_UUID; @@ -695,6 +697,22 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { expectedOutput.toString(), output); } + /** + * Verify storage class of output file matches the expected storage class. + * @param dir output directory. + * @param expectedStorageClass expected storage class value. + * @throws Exception failure. + */ + private void validateStorageClass(Path dir, String expectedStorageClass) throws Exception { + Path expectedFile = getPart0000(dir); + S3AFileSystem fs = getFileSystem(); + String actualStorageClass = fs.getObjectMetadata(expectedFile).getStorageClass(); + + Assertions.assertThat(actualStorageClass) + .describedAs("Storage class of object %s", expectedFile) + .isEqualToIgnoringCase(expectedStorageClass); + } + /** * Identify any path under the directory which begins with the * {@code "part-m-00000"} sequence. @@ -796,6 +814,41 @@ public abstract class AbstractITCommitProtocol extends AbstractCommitITest { assertNoMultipartUploadsPending(outDir); } + @Test + public void testCommitWithStorageClassConfig() throws Exception { + describe("Commit with specific storage class configuration;" + + " expect the final file has correct storage class."); + + Configuration conf = getConfiguration(); + skipIfStorageClassTestsDisabled(conf); + conf.set(STORAGE_CLASS, STORAGE_CLASS_INTELLIGENT_TIERING); + + JobData jobData = startJob(false); + JobContext jContext = jobData.jContext; + TaskAttemptContext tContext = jobData.tContext; + AbstractS3ACommitter committer = jobData.committer; + validateTaskAttemptWorkingDirectory(committer, tContext); + + // write output + writeTextOutput(tContext); + + // commit task + dumpMultipartUploads(); + commitTask(committer, tContext); + + // commit job + assertMultipartUploadsPending(outDir); + commitJob(committer, jContext); + + // validate output + validateContent(outDir, shouldExpectSuccessMarker(), + committer.getUUID()); + assertNoMultipartUploadsPending(outDir); + + // validate storage class + validateStorageClass(outDir, STORAGE_CLASS_INTELLIGENT_TIERING); + } + @Test public void testCommitterWithDuplicatedCommit() throws Exception { describe("Call a task then job commit twice;" + diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesStorageClass.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesStorageClass.java new file mode 100644 index 00000000000..99407467df5 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesStorageClass.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.bandwidth; +import static org.apache.hadoop.fs.contract.ContractTestUtils.toHuman; +import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS; +import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_REDUCED_REDUNDANCY; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfStorageClassTestsDisabled; + +/** + * Class to verify that {@link Constants#STORAGE_CLASS} is set correctly + * for creating and renaming huge files with multipart upload requests. + */ +public class ITestS3AHugeFilesStorageClass extends AbstractSTestS3AHugeFiles { + + private static final Logger LOG = LoggerFactory.getLogger(ITestS3AHugeFilesStorageClass.class); + + @Override + public void setup() throws Exception { + super.setup(); + skipIfStorageClassTestsDisabled(getConfiguration()); + } + + @Override + protected Configuration createScaleConfiguration() { + Configuration conf = super.createScaleConfiguration(); + disableFilesystemCaching(conf); + removeBaseAndBucketOverrides(conf, STORAGE_CLASS); + + conf.set(STORAGE_CLASS, STORAGE_CLASS_REDUCED_REDUNDANCY); + return conf; + } + + @Override + protected String getBlockOutputBufferName() { + return Constants.FAST_UPLOAD_BUFFER_ARRAY; + } + + @Override + public void test_010_CreateHugeFile() throws IOException { + super.test_010_CreateHugeFile(); + assertStorageClass(getPathOfFileToCreate()); + } + + @Override + public void test_030_postCreationAssertions() throws Throwable { + super.test_030_postCreationAssertions(); + assertStorageClass(getPathOfFileToCreate()); + } + + @Override + public void test_040_PositionedReadHugeFile() throws Throwable { + skipQuietly("PositionedReadHugeFile"); + } + + @Override + public void test_050_readHugeFile() throws Throwable { + skipQuietly("readHugeFile"); + } + + @Override + public void test_090_verifyRenameSourceEncryption() throws IOException { + skipQuietly("verifyRenameSourceEncryption"); + } + + @Override + public void test_100_renameHugeFile() throws Throwable { + Path hugefile = getHugefile(); + Path hugefileRenamed = getHugefileRenamed(); + assumeHugeFileExists(); + describe("renaming %s to %s", hugefile, hugefileRenamed); + S3AFileSystem fs = getFileSystem(); + FileStatus status = fs.getFileStatus(hugefile); + long size = status.getLen(); + fs.delete(hugefileRenamed, false); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + fs.rename(hugefile, hugefileRenamed); + long mb = Math.max(size / _1MB, 1); + timer.end("time to rename file of %d MB", mb); + LOG.info("Time per MB to rename = {} nS", toHuman(timer.nanosPerOperation(mb))); + bandwidth(timer, size); + FileStatus destFileStatus = fs.getFileStatus(hugefileRenamed); + assertEquals(size, destFileStatus.getLen()); + assertStorageClass(hugefileRenamed); + } + + @Override + public void test_110_verifyRenameDestEncryption() throws IOException { + skipQuietly("verifyRenameDestEncryption"); + } + + private void skipQuietly(String text) { + describe("Skipping: %s", text); + } + + protected void assertStorageClass(Path hugeFile) throws IOException { + S3AFileSystem fs = getFileSystem(); + String actual = fs.getObjectMetadata(hugeFile).getStorageClass(); + + assertTrue( + "Storage class of object is " + actual + ", expected " + STORAGE_CLASS_REDUCED_REDUNDANCY, + STORAGE_CLASS_REDUCED_REDUNDANCY.equalsIgnoreCase(actual)); + } +}