HADOOP-14661. Add S3 requester pays bucket support to S3A (#3962)

Adds the option fs.s3a.requester.pays.enabled, which, if set to true, allows
the client to access S3 buckets where the requester is billed for the IO.

Contributed by Daniel Carl Jones

Change-Id: I51f64d0f9b3be3c4ec493bcf91927fca3b20407a
This commit is contained in:
Daniel Carl Jones 2022-03-23 20:00:50 +00:00 committed by Steve Loughran
parent ebdd8771c5
commit b749438a8c
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
10 changed files with 202 additions and 24 deletions

View File

@ -157,6 +157,15 @@ public final class Constants {
"fs.s3a.connection.ssl.enabled"; "fs.s3a.connection.ssl.enabled";
public static final boolean DEFAULT_SECURE_CONNECTIONS = true; public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
/**
* Configuration option for S3 Requester Pays feature: {@value}.
*/
public static final String ALLOW_REQUESTER_PAYS = "fs.s3a.requester.pays.enabled";
/**
* Default configuration for {@value ALLOW_REQUESTER_PAYS}: {@value}.
*/
public static final boolean DEFAULT_ALLOW_REQUESTER_PAYS = false;
// use OpenSSL or JSEE for secure connections // use OpenSSL or JSEE for secure connections
public static final String SSL_CHANNEL_MODE = "fs.s3a.ssl.channel.mode"; public static final String SSL_CHANNEL_MODE = "fs.s3a.ssl.channel.mode";
public static final DelegatingSSLSocketFactory.SSLChannelMode public static final DelegatingSSLSocketFactory.SSLChannelMode

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector; import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
import org.apache.hadoop.fs.store.LogExactlyOnce; import org.apache.hadoop.fs.store.LogExactlyOnce;
import static com.amazonaws.services.s3.Headers.REQUESTER_PAYS_HEADER;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING; import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
@ -75,6 +76,8 @@ public class DefaultS3ClientFactory extends Configured
private static final String S3_SERVICE_NAME = "s3"; private static final String S3_SERVICE_NAME = "s3";
private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";
/** /**
* Subclasses refer to this. * Subclasses refer to this.
*/ */
@ -118,6 +121,11 @@ public class DefaultS3ClientFactory extends Configured
parameters.getHeaders().forEach((h, v) -> parameters.getHeaders().forEach((h, v) ->
awsConf.addHeader(h, v)); awsConf.addHeader(h, v));
if (parameters.isRequesterPays()) {
// All calls must acknowledge requester will pay via header.
awsConf.addHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE);
}
// When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false // When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false
// throttling is explicitly disabled on the S3 client so that // throttling is explicitly disabled on the S3 client so that
// all failures are collected in S3A instrumentation, and its // all failures are collected in S3A instrumentation, and its

View File

@ -841,6 +841,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
.withMetrics(statisticsContext.newStatisticsFromAwsSdk()) .withMetrics(statisticsContext.newStatisticsFromAwsSdk())
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false)) .withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
.withUserAgentSuffix(uaSuffix) .withUserAgentSuffix(uaSuffix)
.withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
.withRequestHandlers(auditManager.createRequestHandlers()); .withRequestHandlers(auditManager.createRequestHandlers());
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)

View File

@ -101,7 +101,7 @@ public interface S3ClientFactory {
private boolean pathStyleAccess; private boolean pathStyleAccess;
/** /**
* This is in the settings awaiting wiring up and testing. * Permit requests to requester pays buckets.
*/ */
private boolean requesterPays; private boolean requesterPays;
@ -168,7 +168,7 @@ public interface S3ClientFactory {
} }
/** /**
* Requester pays option. Not yet wired up. * Set requester pays option.
* @param value new value * @param value new value
* @return the builder * @return the builder
*/ */

View File

@ -106,13 +106,6 @@ public class RequestFactoryImpl implements RequestFactory {
*/ */
private final long multipartPartCountLimit; private final long multipartPartCountLimit;
/**
* Requester Pays.
* This is to be wired up in a PR with its
* own tests and docs.
*/
private final boolean requesterPays;
/** /**
* Callback to prepare requests. * Callback to prepare requests.
*/ */
@ -133,7 +126,6 @@ public class RequestFactoryImpl implements RequestFactory {
this.cannedACL = builder.cannedACL; this.cannedACL = builder.cannedACL;
this.encryptionSecrets = builder.encryptionSecrets; this.encryptionSecrets = builder.encryptionSecrets;
this.multipartPartCountLimit = builder.multipartPartCountLimit; this.multipartPartCountLimit = builder.multipartPartCountLimit;
this.requesterPays = builder.requesterPays;
this.requestPreparer = builder.requestPreparer; this.requestPreparer = builder.requestPreparer;
this.contentEncoding = builder.contentEncoding; this.contentEncoding = builder.contentEncoding;
} }
@ -615,9 +607,6 @@ public class RequestFactoryImpl implements RequestFactory {
*/ */
private CannedAccessControlList cannedACL = null; private CannedAccessControlList cannedACL = null;
/** Requester Pays flag. */
private boolean requesterPays = false;
/** Content Encoding. */ /** Content Encoding. */
private String contentEncoding; private String contentEncoding;
@ -684,17 +673,6 @@ public class RequestFactoryImpl implements RequestFactory {
return this; return this;
} }
/**
* Requester Pays flag.
* @param value new value
* @return the builder
*/
public RequestFactoryBuilder withRequesterPays(
final boolean value) {
requesterPays = value;
return this;
}
/** /**
* Multipart limit. * Multipart limit.
* @param value new value * @param value new value

View File

@ -1630,6 +1630,25 @@ Before using Access Points make sure you're not impacted by the following:
considering endpoints, if you have any custom signers that use the host endpoint property make considering endpoints, if you have any custom signers that use the host endpoint property make
sure to update them if needed; sure to update them if needed;
## <a name="requester_pays"></a>Requester Pays buckets
S3A supports buckets with
[Requester Pays](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html)
enabled. When a bucket is configured with requester pays, the requester must cover
the per-request cost.
For requests to be successful, the S3 client must acknowledge that they will pay
for these requests by setting a request flag, usually a header, on each request.
To enable this feature within S3A, configure the `fs.s3a.requester.pays.enabled` property.
```xml
<property>
<name>fs.s3a.requester.pays.enabled</name>
<value>true</value>
</property>
```
## <a name="upload"></a>How S3A writes data to S3 ## <a name="upload"></a>How S3A writes data to S3
The original S3A client implemented file writes by The original S3A client implemented file writes by

View File

@ -593,6 +593,31 @@ your `core-site.xml` file, so that trying to use S3 select fails fast with
a meaningful error ("S3 Select not supported") rather than a generic Bad Request a meaningful error ("S3 Select not supported") rather than a generic Bad Request
exception. exception.
### Testing Requester Pays
By default, the requester pays tests will look for a bucket that exists on Amazon S3
in us-east-1.
If the endpoint does support requester pays, you can specify an alternative object.
The test only requires an object of at least a few bytes in order
to check that lists and basic reads work.
```xml
<property>
<name>test.fs.s3a.requester.pays.file</name>
<value>s3a://my-req-pays-enabled-bucket/on-another-endpoint.json</value>
</property>
```
If the endpoint does not support requester pays, you can also disable the tests by configuring
the test URI as a single space.
```xml
<property>
<name>test.fs.s3a.requester.pays.file</name>
<value> </value>
</property>
```
### Testing Session Credentials ### Testing Session Credentials

View File

@ -547,6 +547,20 @@ When trying to write or read SEE-KMS-encrypted data, the client gets a
The caller does not have the permissions to access The caller does not have the permissions to access
the key with which the data was encrypted. the key with which the data was encrypted.
### <a name="access_denied_requester_pays"></a>`AccessDeniedException` when using a "Requester Pays" enabled bucket
When making cross-account requests to a requester pays enabled bucket, all calls must acknowledge via a header that the requester will be billed.
If you don't enable this acknowledgement within S3A, then you will see a message similar to this:
```
java.nio.file.AccessDeniedException: s3a://my-bucket/my-object: getFileStatus on s3a://my-bucket/my-object:
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403;
Error Code: 403 Forbidden; Request ID: myshortreqid; S3 Extended Request ID: mylongreqid):403 Forbidden
```
To enable requester pays, set `fs.s3a.requester.pays.enabled` property to `true`.
### <a name="no_region_session_credentials"></a> "Unable to find a region via the region provider chain." when using session credentials. ### <a name="no_region_session_credentials"></a> "Unable to find a region via the region provider chain." when using session credentials.
Region must be provided when requesting session credentials, or an exception will be thrown with the message: Region must be provided when requesting session credentials, or an exception will be thrown with the message:

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.nio.file.AccessDeniedException;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Tests for Requester Pays feature.
*/
public class ITestS3ARequesterPays extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(conf,
ALLOW_REQUESTER_PAYS,
S3A_BUCKET_PROBE);
return conf;
}
@Test
public void testRequesterPaysOptionSuccess() throws Throwable {
describe("Test requester pays enabled case by reading last then first byte");
Configuration conf = this.createConfiguration();
conf.setBoolean(ALLOW_REQUESTER_PAYS, true);
// Enable bucket exists check, the first failure point people may encounter
conf.setInt(S3A_BUCKET_PROBE, 2);
Path requesterPaysPath = getRequesterPaysPath(conf);
try (
FileSystem fs = requesterPaysPath.getFileSystem(conf);
FSDataInputStream inputStream = fs.open(requesterPaysPath);
) {
long fileLength = fs.getFileStatus(requesterPaysPath).getLen();
inputStream.seek(fileLength - 1);
inputStream.readByte();
// Jump back to the start, triggering a new GetObject request.
inputStream.seek(0);
inputStream.readByte();
// Verify > 1 call was made, so we're sure it is correctly configured for each request
IOStatisticAssertions
.assertThatStatisticCounter(inputStream.getIOStatistics(),
StreamStatisticNames.STREAM_READ_OPENED)
.isGreaterThan(1);
// Check list calls work without error
fs.listFiles(requesterPaysPath.getParent(), false);
}
}
@Test
public void testRequesterPaysDisabledFails() throws Throwable {
describe("Verify expected failure for requester pays buckets when client has it disabled");
Configuration conf = this.createConfiguration();
conf.setBoolean(ALLOW_REQUESTER_PAYS, false);
Path requesterPaysPath = getRequesterPaysPath(conf);
try (FileSystem fs = requesterPaysPath.getFileSystem(conf)) {
intercept(
AccessDeniedException.class,
"403 Forbidden",
"Expected requester pays bucket to fail without header set",
() -> fs.open(requesterPaysPath).close()
);
}
}
private Path getRequesterPaysPath(Configuration conf) {
String requesterPaysFile =
conf.getTrimmed(KEY_REQUESTER_PAYS_FILE, DEFAULT_REQUESTER_PAYS_FILE);
S3ATestUtils.assume(
"Empty test property: " + KEY_REQUESTER_PAYS_FILE,
!requesterPaysFile.isEmpty()
);
return new Path(requesterPaysFile);
}
}

View File

@ -97,6 +97,17 @@ public interface S3ATestConstants {
*/ */
String DEFAULT_CSVTEST_FILE = LANDSAT_BUCKET + "scene_list.gz"; String DEFAULT_CSVTEST_FILE = LANDSAT_BUCKET + "scene_list.gz";
/**
* Configuration key for an existing object in a requester pays bucket: {@value}.
* If not set, defaults to {@value DEFAULT_REQUESTER_PAYS_FILE}.
*/
String KEY_REQUESTER_PAYS_FILE = TEST_FS_S3A + "requester.pays.file";
/**
* Default path for an S3 object inside a requester pays enabled bucket: {@value}.
*/
String DEFAULT_REQUESTER_PAYS_FILE = "s3a://usgs-landsat/collection02/catalog.json";
/** /**
* Name of the property to define the timeout for scale tests: {@value}. * Name of the property to define the timeout for scale tests: {@value}.
* Measured in seconds. * Measured in seconds.