From 9edfe30a60f4c43405301927a8f4014d1dd3842d Mon Sep 17 00:00:00 2001 From: Daniel Carl Jones Date: Wed, 23 Mar 2022 20:00:50 +0000 Subject: [PATCH] HADOOP-14661. Add S3 requester pays bucket support to S3A (#3962) Adds the option fs.s3a.requester.pays.enabled, which, if set to true, allows the client to access S3 buckets where the requester is billed for the IO. Contributed by Daniel Carl Jones --- .../org/apache/hadoop/fs/s3a/Constants.java | 9 ++ .../hadoop/fs/s3a/DefaultS3ClientFactory.java | 8 ++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 1 + .../apache/hadoop/fs/s3a/S3ClientFactory.java | 4 +- .../fs/s3a/impl/RequestFactoryImpl.java | 22 ---- .../site/markdown/tools/hadoop-aws/index.md | 19 +++ .../site/markdown/tools/hadoop-aws/testing.md | 25 ++++ .../tools/hadoop-aws/troubleshooting_s3a.md | 14 +++ .../hadoop/fs/s3a/ITestS3ARequesterPays.java | 113 ++++++++++++++++++ .../hadoop/fs/s3a/S3ATestConstants.java | 11 ++ 10 files changed, 202 insertions(+), 24 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index dd7e4258809..cb3d72e5668 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -157,6 +157,15 @@ public final class Constants { "fs.s3a.connection.ssl.enabled"; public static final boolean DEFAULT_SECURE_CONNECTIONS = true; + /** + * Configuration option for S3 Requester Pays feature: {@value}. + */ + public static final String ALLOW_REQUESTER_PAYS = "fs.s3a.requester.pays.enabled"; + /** + * Default configuration for {@value ALLOW_REQUESTER_PAYS}: {@value}. + */ + public static final boolean DEFAULT_ALLOW_REQUESTER_PAYS = false; + // use OpenSSL or JSEE for secure connections public static final String SSL_CHANNEL_MODE = "fs.s3a.ssl.channel.mode"; public static final DelegatingSSLSocketFactory.SSLChannelMode diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index c14558adf54..c374ef7397c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -54,6 +54,7 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector; import org.apache.hadoop.fs.store.LogExactlyOnce; +import static com.amazonaws.services.s3.Headers.REQUESTER_PAYS_HEADER; import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION; import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING; @@ -75,6 +76,8 @@ public class DefaultS3ClientFactory extends Configured private static final String S3_SERVICE_NAME = "s3"; + private static final String REQUESTER_PAYS_HEADER_VALUE = "requester"; + /** * Subclasses refer to this. */ @@ -118,6 +121,11 @@ public class DefaultS3ClientFactory extends Configured parameters.getHeaders().forEach((h, v) -> awsConf.addHeader(h, v)); + if (parameters.isRequesterPays()) { + // All calls must acknowledge requester will pay via header. + awsConf.addHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE); + } + // When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false // throttling is explicitly disabled on the S3 client so that // all failures are collected in S3A instrumentation, and its diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 4b450c4dcce..83c3a74f5b3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -841,6 +841,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, .withMetrics(statisticsContext.newStatisticsFromAwsSdk()) .withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false)) .withUserAgentSuffix(uaSuffix) + .withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS)) .withRequestHandlers(auditManager.createRequestHandlers()); s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index 5ef99ed6f5c..34674c78890 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -101,7 +101,7 @@ public interface S3ClientFactory { private boolean pathStyleAccess; /** - * This is in the settings awaiting wiring up and testing. + * Permit requests to requester pays buckets. */ private boolean requesterPays; @@ -168,7 +168,7 @@ public interface S3ClientFactory { } /** - * Requester pays option. Not yet wired up. + * Set requester pays option. * @param value new value * @return the builder */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index fa58323decd..a73e7199380 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -106,13 +106,6 @@ public class RequestFactoryImpl implements RequestFactory { */ private final long multipartPartCountLimit; - /** - * Requester Pays. - * This is to be wired up in a PR with its - * own tests and docs. - */ - private final boolean requesterPays; - /** * Callback to prepare requests. */ @@ -133,7 +126,6 @@ public class RequestFactoryImpl implements RequestFactory { this.cannedACL = builder.cannedACL; this.encryptionSecrets = builder.encryptionSecrets; this.multipartPartCountLimit = builder.multipartPartCountLimit; - this.requesterPays = builder.requesterPays; this.requestPreparer = builder.requestPreparer; this.contentEncoding = builder.contentEncoding; } @@ -615,9 +607,6 @@ public class RequestFactoryImpl implements RequestFactory { */ private CannedAccessControlList cannedACL = null; - /** Requester Pays flag. */ - private boolean requesterPays = false; - /** Content Encoding. */ private String contentEncoding; @@ -684,17 +673,6 @@ public class RequestFactoryImpl implements RequestFactory { return this; } - /** - * Requester Pays flag. - * @param value new value - * @return the builder - */ - public RequestFactoryBuilder withRequesterPays( - final boolean value) { - requesterPays = value; - return this; - } - /** * Multipart limit. * @param value new value diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index aa4e13ff3df..df08a969e95 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1633,6 +1633,25 @@ Before using Access Points make sure you're not impacted by the following: considering endpoints, if you have any custom signers that use the host endpoint property make sure to update them if needed; +## Requester Pays buckets + +S3A supports buckets with +[Requester Pays](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html) +enabled. When a bucket is configured with requester pays, the requester must cover +the per-request cost. + +For requests to be successful, the S3 client must acknowledge that they will pay +for these requests by setting a request flag, usually a header, on each request. + +To enable this feature within S3A, configure the `fs.s3a.requester.pays.enabled` property. + +```xml + + fs.s3a.requester.pays.enabled + true + +``` + ## How S3A writes data to S3 The original S3A client implemented file writes by diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md index 559687a3fdb..2641b870d2e 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md @@ -593,6 +593,31 @@ your `core-site.xml` file, so that trying to use S3 select fails fast with a meaningful error ("S3 Select not supported") rather than a generic Bad Request exception. +### Testing Requester Pays + +By default, the requester pays tests will look for a bucket that exists on Amazon S3 +in us-east-1. + +If the endpoint does support requester pays, you can specify an alternative object. +The test only requires an object of at least a few bytes in order +to check that lists and basic reads work. + +```xml + + test.fs.s3a.requester.pays.file + s3a://my-req-pays-enabled-bucket/on-another-endpoint.json + +``` + +If the endpoint does not support requester pays, you can also disable the tests by configuring +the test URI as a single space. + +```xml + + test.fs.s3a.requester.pays.file + + +``` ### Testing Session Credentials diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md index 3019b8525db..96e6e287dea 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -547,6 +547,20 @@ When trying to write or read SEE-KMS-encrypted data, the client gets a The caller does not have the permissions to access the key with which the data was encrypted. +### `AccessDeniedException` when using a "Requester Pays" enabled bucket + +When making cross-account requests to a requester pays enabled bucket, all calls must acknowledge via a header that the requester will be billed. + +If you don't enable this acknowledgement within S3A, then you will see a message similar to this: + +``` +java.nio.file.AccessDeniedException: s3a://my-bucket/my-object: getFileStatus on s3a://my-bucket/my-object: +com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; +Error Code: 403 Forbidden; Request ID: myshortreqid; S3 Extended Request ID: mylongreqid):403 Forbidden +``` + +To enable requester pays, set `fs.s3a.requester.pays.enabled` property to `true`. + ### "Unable to find a region via the region provider chain." when using session credentials. Region must be provided when requesting session credentials, or an exception will be thrown with the message: diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java new file mode 100644 index 00000000000..c2e7684cad6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.nio.file.AccessDeniedException; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.statistics.IOStatisticAssertions; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; + +import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS; +import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Tests for Requester Pays feature. + */ +public class ITestS3ARequesterPays extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.removeBaseAndBucketOverrides(conf, + ALLOW_REQUESTER_PAYS, + S3A_BUCKET_PROBE); + return conf; + } + + @Test + public void testRequesterPaysOptionSuccess() throws Throwable { + describe("Test requester pays enabled case by reading last then first byte"); + + Configuration conf = this.createConfiguration(); + conf.setBoolean(ALLOW_REQUESTER_PAYS, true); + // Enable bucket exists check, the first failure point people may encounter + conf.setInt(S3A_BUCKET_PROBE, 2); + + Path requesterPaysPath = getRequesterPaysPath(conf); + + try ( + FileSystem fs = requesterPaysPath.getFileSystem(conf); + FSDataInputStream inputStream = fs.open(requesterPaysPath); + ) { + long fileLength = fs.getFileStatus(requesterPaysPath).getLen(); + + inputStream.seek(fileLength - 1); + inputStream.readByte(); + + // Jump back to the start, triggering a new GetObject request. + inputStream.seek(0); + inputStream.readByte(); + + // Verify > 1 call was made, so we're sure it is correctly configured for each request + IOStatisticAssertions + .assertThatStatisticCounter(inputStream.getIOStatistics(), + StreamStatisticNames.STREAM_READ_OPENED) + .isGreaterThan(1); + + // Check list calls work without error + fs.listFiles(requesterPaysPath.getParent(), false); + } + } + + @Test + public void testRequesterPaysDisabledFails() throws Throwable { + describe("Verify expected failure for requester pays buckets when client has it disabled"); + + Configuration conf = this.createConfiguration(); + conf.setBoolean(ALLOW_REQUESTER_PAYS, false); + Path requesterPaysPath = getRequesterPaysPath(conf); + + try (FileSystem fs = requesterPaysPath.getFileSystem(conf)) { + intercept( + AccessDeniedException.class, + "403 Forbidden", + "Expected requester pays bucket to fail without header set", + () -> fs.open(requesterPaysPath).close() + ); + } + } + + private Path getRequesterPaysPath(Configuration conf) { + String requesterPaysFile = + conf.getTrimmed(KEY_REQUESTER_PAYS_FILE, DEFAULT_REQUESTER_PAYS_FILE); + S3ATestUtils.assume( + "Empty test property: " + KEY_REQUESTER_PAYS_FILE, + !requesterPaysFile.isEmpty() + ); + return new Path(requesterPaysFile); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java index aca622a9e20..742c22ac5a5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java @@ -97,6 +97,17 @@ public interface S3ATestConstants { */ String DEFAULT_CSVTEST_FILE = LANDSAT_BUCKET + "scene_list.gz"; + /** + * Configuration key for an existing object in a requester pays bucket: {@value}. + * If not set, defaults to {@value DEFAULT_REQUESTER_PAYS_FILE}. + */ + String KEY_REQUESTER_PAYS_FILE = TEST_FS_S3A + "requester.pays.file"; + + /** + * Default path for an S3 object inside a requester pays enabled bucket: {@value}. + */ + String DEFAULT_REQUESTER_PAYS_FILE = "s3a://usgs-landsat/collection02/catalog.json"; + /** * Name of the property to define the timeout for scale tests: {@value}. * Measured in seconds.