HADOOP-14661. Add S3 requester pays bucket support to S3A (#3962)
Adds the option fs.s3a.requester.pays.enabled, which, if set to true, allows the client to access S3 buckets where the requester is billed for the IO. Contributed by Daniel Carl Jones
This commit is contained in:
parent
921267ca31
commit
9edfe30a60
|
@ -157,6 +157,15 @@ public final class Constants {
|
|||
"fs.s3a.connection.ssl.enabled";
|
||||
public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
|
||||
|
||||
/**
|
||||
* Configuration option for S3 Requester Pays feature: {@value}.
|
||||
*/
|
||||
public static final String ALLOW_REQUESTER_PAYS = "fs.s3a.requester.pays.enabled";
|
||||
/**
|
||||
* Default configuration for {@value ALLOW_REQUESTER_PAYS}: {@value}.
|
||||
*/
|
||||
public static final boolean DEFAULT_ALLOW_REQUESTER_PAYS = false;
|
||||
|
||||
// use OpenSSL or JSEE for secure connections
|
||||
public static final String SSL_CHANNEL_MODE = "fs.s3a.ssl.channel.mode";
|
||||
public static final DelegatingSSLSocketFactory.SSLChannelMode
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.conf.Configured;
|
|||
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
|
||||
import org.apache.hadoop.fs.store.LogExactlyOnce;
|
||||
|
||||
import static com.amazonaws.services.s3.Headers.REQUESTER_PAYS_HEADER;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
|
||||
|
@ -75,6 +76,8 @@ public class DefaultS3ClientFactory extends Configured
|
|||
|
||||
private static final String S3_SERVICE_NAME = "s3";
|
||||
|
||||
private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";
|
||||
|
||||
/**
|
||||
* Subclasses refer to this.
|
||||
*/
|
||||
|
@ -118,6 +121,11 @@ public class DefaultS3ClientFactory extends Configured
|
|||
parameters.getHeaders().forEach((h, v) ->
|
||||
awsConf.addHeader(h, v));
|
||||
|
||||
if (parameters.isRequesterPays()) {
|
||||
// All calls must acknowledge requester will pay via header.
|
||||
awsConf.addHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE);
|
||||
}
|
||||
|
||||
// When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false
|
||||
// throttling is explicitly disabled on the S3 client so that
|
||||
// all failures are collected in S3A instrumentation, and its
|
||||
|
|
|
@ -841,6 +841,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
.withMetrics(statisticsContext.newStatisticsFromAwsSdk())
|
||||
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
|
||||
.withUserAgentSuffix(uaSuffix)
|
||||
.withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
|
||||
.withRequestHandlers(auditManager.createRequestHandlers());
|
||||
|
||||
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
|
||||
|
|
|
@ -101,7 +101,7 @@ public interface S3ClientFactory {
|
|||
private boolean pathStyleAccess;
|
||||
|
||||
/**
|
||||
* This is in the settings awaiting wiring up and testing.
|
||||
* Permit requests to requester pays buckets.
|
||||
*/
|
||||
private boolean requesterPays;
|
||||
|
||||
|
@ -168,7 +168,7 @@ public interface S3ClientFactory {
|
|||
}
|
||||
|
||||
/**
|
||||
* Requester pays option. Not yet wired up.
|
||||
* Set requester pays option.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
|
|
|
@ -106,13 +106,6 @@ public class RequestFactoryImpl implements RequestFactory {
|
|||
*/
|
||||
private final long multipartPartCountLimit;
|
||||
|
||||
/**
|
||||
* Requester Pays.
|
||||
* This is to be wired up in a PR with its
|
||||
* own tests and docs.
|
||||
*/
|
||||
private final boolean requesterPays;
|
||||
|
||||
/**
|
||||
* Callback to prepare requests.
|
||||
*/
|
||||
|
@ -133,7 +126,6 @@ public class RequestFactoryImpl implements RequestFactory {
|
|||
this.cannedACL = builder.cannedACL;
|
||||
this.encryptionSecrets = builder.encryptionSecrets;
|
||||
this.multipartPartCountLimit = builder.multipartPartCountLimit;
|
||||
this.requesterPays = builder.requesterPays;
|
||||
this.requestPreparer = builder.requestPreparer;
|
||||
this.contentEncoding = builder.contentEncoding;
|
||||
}
|
||||
|
@ -615,9 +607,6 @@ public class RequestFactoryImpl implements RequestFactory {
|
|||
*/
|
||||
private CannedAccessControlList cannedACL = null;
|
||||
|
||||
/** Requester Pays flag. */
|
||||
private boolean requesterPays = false;
|
||||
|
||||
/** Content Encoding. */
|
||||
private String contentEncoding;
|
||||
|
||||
|
@ -684,17 +673,6 @@ public class RequestFactoryImpl implements RequestFactory {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Requester Pays flag.
|
||||
* @param value new value
|
||||
* @return the builder
|
||||
*/
|
||||
public RequestFactoryBuilder withRequesterPays(
|
||||
final boolean value) {
|
||||
requesterPays = value;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Multipart limit.
|
||||
* @param value new value
|
||||
|
|
|
@ -1633,6 +1633,25 @@ Before using Access Points make sure you're not impacted by the following:
|
|||
considering endpoints, if you have any custom signers that use the host endpoint property make
|
||||
sure to update them if needed;
|
||||
|
||||
## <a name="requester_pays"></a>Requester Pays buckets
|
||||
|
||||
S3A supports buckets with
|
||||
[Requester Pays](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html)
|
||||
enabled. When a bucket is configured with requester pays, the requester must cover
|
||||
the per-request cost.
|
||||
|
||||
For requests to be successful, the S3 client must acknowledge that they will pay
|
||||
for these requests by setting a request flag, usually a header, on each request.
|
||||
|
||||
To enable this feature within S3A, configure the `fs.s3a.requester.pays.enabled` property.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.requester.pays.enabled</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
## <a name="upload"></a>How S3A writes data to S3
|
||||
|
||||
The original S3A client implemented file writes by
|
||||
|
|
|
@ -593,6 +593,31 @@ your `core-site.xml` file, so that trying to use S3 select fails fast with
|
|||
a meaningful error ("S3 Select not supported") rather than a generic Bad Request
|
||||
exception.
|
||||
|
||||
### Testing Requester Pays
|
||||
|
||||
By default, the requester pays tests will look for a bucket that exists on Amazon S3
|
||||
in us-east-1.
|
||||
|
||||
If the endpoint does support requester pays, you can specify an alternative object.
|
||||
The test only requires an object of at least a few bytes in order
|
||||
to check that lists and basic reads work.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>test.fs.s3a.requester.pays.file</name>
|
||||
<value>s3a://my-req-pays-enabled-bucket/on-another-endpoint.json</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
If the endpoint does not support requester pays, you can also disable the tests by configuring
|
||||
the test URI as a single space.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>test.fs.s3a.requester.pays.file</name>
|
||||
<value> </value>
|
||||
</property>
|
||||
```
|
||||
|
||||
### Testing Session Credentials
|
||||
|
||||
|
|
|
@ -547,6 +547,20 @@ When trying to write or read SEE-KMS-encrypted data, the client gets a
|
|||
The caller does not have the permissions to access
|
||||
the key with which the data was encrypted.
|
||||
|
||||
### <a name="access_denied_requester_pays"></a>`AccessDeniedException` when using a "Requester Pays" enabled bucket
|
||||
|
||||
When making cross-account requests to a requester pays enabled bucket, all calls must acknowledge via a header that the requester will be billed.
|
||||
|
||||
If you don't enable this acknowledgement within S3A, then you will see a message similar to this:
|
||||
|
||||
```
|
||||
java.nio.file.AccessDeniedException: s3a://my-bucket/my-object: getFileStatus on s3a://my-bucket/my-object:
|
||||
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403;
|
||||
Error Code: 403 Forbidden; Request ID: myshortreqid; S3 Extended Request ID: mylongreqid):403 Forbidden
|
||||
```
|
||||
|
||||
To enable requester pays, set `fs.s3a.requester.pays.enabled` property to `true`.
|
||||
|
||||
### <a name="no_region_session_credentials"></a> "Unable to find a region via the region provider chain." when using session credentials.
|
||||
|
||||
Region must be provided when requesting session credentials, or an exception will be thrown with the message:
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import java.nio.file.AccessDeniedException;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
|
||||
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
/**
|
||||
* Tests for Requester Pays feature.
|
||||
*/
|
||||
public class ITestS3ARequesterPays extends AbstractS3ATestBase {
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
Configuration conf = super.createConfiguration();
|
||||
S3ATestUtils.removeBaseAndBucketOverrides(conf,
|
||||
ALLOW_REQUESTER_PAYS,
|
||||
S3A_BUCKET_PROBE);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequesterPaysOptionSuccess() throws Throwable {
|
||||
describe("Test requester pays enabled case by reading last then first byte");
|
||||
|
||||
Configuration conf = this.createConfiguration();
|
||||
conf.setBoolean(ALLOW_REQUESTER_PAYS, true);
|
||||
// Enable bucket exists check, the first failure point people may encounter
|
||||
conf.setInt(S3A_BUCKET_PROBE, 2);
|
||||
|
||||
Path requesterPaysPath = getRequesterPaysPath(conf);
|
||||
|
||||
try (
|
||||
FileSystem fs = requesterPaysPath.getFileSystem(conf);
|
||||
FSDataInputStream inputStream = fs.open(requesterPaysPath);
|
||||
) {
|
||||
long fileLength = fs.getFileStatus(requesterPaysPath).getLen();
|
||||
|
||||
inputStream.seek(fileLength - 1);
|
||||
inputStream.readByte();
|
||||
|
||||
// Jump back to the start, triggering a new GetObject request.
|
||||
inputStream.seek(0);
|
||||
inputStream.readByte();
|
||||
|
||||
// Verify > 1 call was made, so we're sure it is correctly configured for each request
|
||||
IOStatisticAssertions
|
||||
.assertThatStatisticCounter(inputStream.getIOStatistics(),
|
||||
StreamStatisticNames.STREAM_READ_OPENED)
|
||||
.isGreaterThan(1);
|
||||
|
||||
// Check list calls work without error
|
||||
fs.listFiles(requesterPaysPath.getParent(), false);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequesterPaysDisabledFails() throws Throwable {
|
||||
describe("Verify expected failure for requester pays buckets when client has it disabled");
|
||||
|
||||
Configuration conf = this.createConfiguration();
|
||||
conf.setBoolean(ALLOW_REQUESTER_PAYS, false);
|
||||
Path requesterPaysPath = getRequesterPaysPath(conf);
|
||||
|
||||
try (FileSystem fs = requesterPaysPath.getFileSystem(conf)) {
|
||||
intercept(
|
||||
AccessDeniedException.class,
|
||||
"403 Forbidden",
|
||||
"Expected requester pays bucket to fail without header set",
|
||||
() -> fs.open(requesterPaysPath).close()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private Path getRequesterPaysPath(Configuration conf) {
|
||||
String requesterPaysFile =
|
||||
conf.getTrimmed(KEY_REQUESTER_PAYS_FILE, DEFAULT_REQUESTER_PAYS_FILE);
|
||||
S3ATestUtils.assume(
|
||||
"Empty test property: " + KEY_REQUESTER_PAYS_FILE,
|
||||
!requesterPaysFile.isEmpty()
|
||||
);
|
||||
return new Path(requesterPaysFile);
|
||||
}
|
||||
|
||||
}
|
|
@ -97,6 +97,17 @@ public interface S3ATestConstants {
|
|||
*/
|
||||
String DEFAULT_CSVTEST_FILE = LANDSAT_BUCKET + "scene_list.gz";
|
||||
|
||||
/**
|
||||
* Configuration key for an existing object in a requester pays bucket: {@value}.
|
||||
* If not set, defaults to {@value DEFAULT_REQUESTER_PAYS_FILE}.
|
||||
*/
|
||||
String KEY_REQUESTER_PAYS_FILE = TEST_FS_S3A + "requester.pays.file";
|
||||
|
||||
/**
|
||||
* Default path for an S3 object inside a requester pays enabled bucket: {@value}.
|
||||
*/
|
||||
String DEFAULT_REQUESTER_PAYS_FILE = "s3a://usgs-landsat/collection02/catalog.json";
|
||||
|
||||
/**
|
||||
* Name of the property to define the timeout for scale tests: {@value}.
|
||||
* Measured in seconds.
|
||||
|
|
Loading…
Reference in New Issue