HADOOP-17705. S3A to add Config to set AWS region (#3020)
The option `fs.s3a.endpoint.region` can be used to explicitly set the AWS region of a bucket. This is needed when using AWS Private Link, as the region cannot be automatically determined. Contributed by Mehakmeet Singh
This commit is contained in:
parent
c665ab02ed
commit
5f400032b6
|
@ -1081,4 +1081,10 @@ public final class Constants {
|
||||||
*/
|
*/
|
||||||
public static final String XA_HEADER_PREFIX = "header.";
|
public static final String XA_HEADER_PREFIX = "header.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* AWS S3 region for the bucket. When set bypasses the construction of
|
||||||
|
* region through endpoint url.
|
||||||
|
*/
|
||||||
|
public static final String AWS_REGION = "fs.s3a.endpoint.region";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
|
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
|
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
|
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
|
||||||
|
|
||||||
|
@ -132,7 +133,7 @@ public class DefaultS3ClientFactory extends Configured
|
||||||
// endpoint set up is a PITA
|
// endpoint set up is a PITA
|
||||||
AwsClientBuilder.EndpointConfiguration epr
|
AwsClientBuilder.EndpointConfiguration epr
|
||||||
= createEndpointConfiguration(parameters.getEndpoint(),
|
= createEndpointConfiguration(parameters.getEndpoint(),
|
||||||
awsConf);
|
awsConf, getConf().getTrimmed(AWS_REGION));
|
||||||
if (epr != null) {
|
if (epr != null) {
|
||||||
// an endpoint binding was constructed: use it.
|
// an endpoint binding was constructed: use it.
|
||||||
b.withEndpointConfiguration(epr);
|
b.withEndpointConfiguration(epr);
|
||||||
|
@ -197,12 +198,14 @@ public class DefaultS3ClientFactory extends Configured
|
||||||
*
|
*
|
||||||
* @param endpoint possibly null endpoint.
|
* @param endpoint possibly null endpoint.
|
||||||
* @param awsConf config to build the URI from.
|
* @param awsConf config to build the URI from.
|
||||||
|
* @param awsRegion AWS S3 Region if the corresponding config is set.
|
||||||
* @return a configuration for the S3 client builder.
|
* @return a configuration for the S3 client builder.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static AwsClientBuilder.EndpointConfiguration
|
public static AwsClientBuilder.EndpointConfiguration
|
||||||
createEndpointConfiguration(
|
createEndpointConfiguration(
|
||||||
final String endpoint, final ClientConfiguration awsConf) {
|
final String endpoint, final ClientConfiguration awsConf,
|
||||||
|
String awsRegion) {
|
||||||
LOG.debug("Creating endpoint configuration for {}", endpoint);
|
LOG.debug("Creating endpoint configuration for {}", endpoint);
|
||||||
if (endpoint == null || endpoint.isEmpty()) {
|
if (endpoint == null || endpoint.isEmpty()) {
|
||||||
// the default endpoint...we should be using null at this point.
|
// the default endpoint...we should be using null at this point.
|
||||||
|
@ -212,8 +215,8 @@ public class DefaultS3ClientFactory extends Configured
|
||||||
|
|
||||||
final URI epr = RuntimeHttpUtils.toUri(endpoint, awsConf);
|
final URI epr = RuntimeHttpUtils.toUri(endpoint, awsConf);
|
||||||
LOG.debug("Endpoint URI = {}", epr);
|
LOG.debug("Endpoint URI = {}", epr);
|
||||||
|
String region = awsRegion;
|
||||||
String region;
|
if (StringUtils.isBlank(region)) {
|
||||||
if (!ServiceUtils.isS3USStandardEndpoint(endpoint)) {
|
if (!ServiceUtils.isS3USStandardEndpoint(endpoint)) {
|
||||||
LOG.debug("Endpoint {} is not the default; parsing", epr);
|
LOG.debug("Endpoint {} is not the default; parsing", epr);
|
||||||
region = AwsHostNameUtils.parseRegion(
|
region = AwsHostNameUtils.parseRegion(
|
||||||
|
@ -221,9 +224,11 @@ public class DefaultS3ClientFactory extends Configured
|
||||||
S3_SERVICE_NAME);
|
S3_SERVICE_NAME);
|
||||||
} else {
|
} else {
|
||||||
// US-east, set region == null.
|
// US-east, set region == null.
|
||||||
LOG.debug("Endpoint {} is the standard one; declare region as null", epr);
|
LOG.debug("Endpoint {} is the standard one; declare region as null",
|
||||||
|
epr);
|
||||||
region = null;
|
region = null;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
LOG.debug("Region for endpoint {}, URI {} is determined as {}",
|
LOG.debug("Region for endpoint {}, URI {} is determined as {}",
|
||||||
endpoint, epr, region);
|
endpoint, epr, region);
|
||||||
return new AwsClientBuilder.EndpointConfiguration(endpoint, region);
|
return new AwsClientBuilder.EndpointConfiguration(endpoint, region);
|
||||||
|
|
|
@ -797,6 +797,14 @@ options are covered in [Testing](./testing.md).
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.endpoint.region</name>
|
||||||
|
<description>AWS S3 region for a bucket, which bypasses the parsing of
|
||||||
|
fs.s3a.endpoint to know the region. Would be helpful in avoiding errors
|
||||||
|
while using privateLink URL and explicitly set the bucket region.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.s3a.path.style.access</name>
|
<name>fs.s3a.path.style.access</name>
|
||||||
<value>false</value>
|
<value>false</value>
|
||||||
|
|
|
@ -247,6 +247,32 @@ As an example, the endpoint for S3 Frankfurt is `s3.eu-central-1.amazonaws.com`:
|
||||||
<value>s3.eu-central-1.amazonaws.com</value>
|
<value>s3.eu-central-1.amazonaws.com</value>
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
### <a name="AuthorizationHeaderMalformed"></a> "Authorization Header is Malformed"(400) exception when PrivateLink URL is used in "fs.s3a.endpoint"
|
||||||
|
|
||||||
|
When [PrivateLink](https://docs.aws.amazon.com/AmazonS3/latest/userguide/privatelink-interface-endpoints.html) URL
|
||||||
|
is used instead of standard s3a endpoint, it returns "authorization
|
||||||
|
header is malformed" exception. So, if we set fs.s3a.endpoint=bucket.vpce
|
||||||
|
-<some_string>.s3.ca-central-1.vpce.amazonaws.com and make s3 calls we get:
|
||||||
|
```
|
||||||
|
com.amazonaws.services.s3.model.AmazonS3Exception: The authorization header is malformed; the region 'vpce' is wrong; expecting 'ca-central-1'
|
||||||
|
(Service: Amazon S3; Status Code: 400; Error Code: AuthorizationHeaderMalformed; Request ID: req-id; S3 Extended Request ID: req-id-2), S3 Extended Request ID: req-id-2:AuthorizationHeaderMalformed: The authorization
|
||||||
|
header is malformed; the region 'vpce' is wrong; expecting 'ca-central-1' (Service: Amazon S3; Status Code: 400; Error Code: AuthorizationHeaderMalformed; Request ID: req-id;
|
||||||
|
```
|
||||||
|
Cause:
|
||||||
|
|
||||||
|
Since, endpoint parsing is done in a way that it assumes the AWS S3 region
|
||||||
|
would be the 2nd component of the `fs.s3a.endpoint` URL delimited by ".", in
|
||||||
|
case of PrivateLink URL, it can't figure out the region and throws an
|
||||||
|
authorization exception. Thus, to add support to using PrivateLink URLs we use `fs.s3a.endpoint.region`
|
||||||
|
to set the region and bypass this parsing of `fs.s3a.endpoint`, in the case shown above to make it work we'll set the AWS
|
||||||
|
S3 region as `ca-central-1`.
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.endpoint.region</name>
|
||||||
|
<value>ca-central-1</value>
|
||||||
|
</property>
|
||||||
|
```
|
||||||
|
|
||||||
### `Class does not implement AWSCredentialsProvider`
|
### `Class does not implement AWSCredentialsProvider`
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
import com.amazonaws.client.builder.AwsClientBuilder;
|
||||||
|
import com.amazonaws.util.AwsHostNameUtils;
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to check correctness of S3A endpoint regions in
|
||||||
|
* {@link DefaultS3ClientFactory}.
|
||||||
|
*/
|
||||||
|
public class ITestS3AEndpointRegion extends AbstractS3ATestBase {
|
||||||
|
|
||||||
|
private static final String AWS_REGION_TEST = "test-region";
|
||||||
|
private static final String AWS_ENDPOINT_TEST = "test-endpoint";
|
||||||
|
private static final String AWS_ENDPOINT_TEST_WITH_REGION =
|
||||||
|
"test-endpoint.some-region.amazonaws.com";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify that setting a region with the config would bypass the
|
||||||
|
* construction of region from endpoint.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWithRegionConfig() {
|
||||||
|
getFileSystem().getConf().set(AWS_REGION, AWS_REGION_TEST);
|
||||||
|
|
||||||
|
//Creating an endpoint config with a custom endpoint.
|
||||||
|
AwsClientBuilder.EndpointConfiguration epr = createEpr(AWS_ENDPOINT_TEST,
|
||||||
|
getFileSystem().getConf().getTrimmed(AWS_REGION));
|
||||||
|
//Checking if setting region config bypasses the endpoint region.
|
||||||
|
Assertions.assertThat(epr.getSigningRegion())
|
||||||
|
.describedAs("There is a region mismatch")
|
||||||
|
.isEqualTo(getFileSystem().getConf().get(AWS_REGION));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify that not setting the region config, would lead to using
|
||||||
|
* endpoint to construct the region.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWithoutRegionConfig() {
|
||||||
|
getFileSystem().getConf().unset(AWS_REGION);
|
||||||
|
|
||||||
|
//Creating an endpoint config with a custom endpoint containing a region.
|
||||||
|
AwsClientBuilder.EndpointConfiguration eprRandom =
|
||||||
|
createEpr(AWS_ENDPOINT_TEST_WITH_REGION,
|
||||||
|
getFileSystem().getConf().getTrimmed(AWS_REGION));
|
||||||
|
String regionFromEndpoint =
|
||||||
|
AwsHostNameUtils
|
||||||
|
.parseRegionFromAwsPartitionPattern(AWS_ENDPOINT_TEST_WITH_REGION);
|
||||||
|
//Checking if not setting region config leads to constructing the region
|
||||||
|
// from endpoint.
|
||||||
|
Assertions.assertThat(eprRandom.getSigningRegion())
|
||||||
|
.describedAs("There is a region mismatch")
|
||||||
|
.isNotEqualTo(getFileSystem().getConf().get(AWS_REGION))
|
||||||
|
.isEqualTo(regionFromEndpoint);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method to create EndpointConfiguration using an endpoint.
|
||||||
|
*
|
||||||
|
* @param endpoint the endpoint to be used for EndpointConfiguration creation.
|
||||||
|
* @return an instance of EndpointConfiguration.
|
||||||
|
*/
|
||||||
|
private AwsClientBuilder.EndpointConfiguration createEpr(String endpoint,
|
||||||
|
String awsRegion) {
|
||||||
|
return DefaultS3ClientFactory.createEndpointConfiguration(endpoint,
|
||||||
|
new ClientConfiguration(), awsRegion);
|
||||||
|
}
|
||||||
|
}
|
|
@ -85,7 +85,7 @@ public class TestNetworkBinding extends AbstractHadoopTestBase {
|
||||||
final boolean expectNull,
|
final boolean expectNull,
|
||||||
final String expectRegion) {
|
final String expectRegion) {
|
||||||
AwsClientBuilder.EndpointConfiguration epr =
|
AwsClientBuilder.EndpointConfiguration epr =
|
||||||
createEndpointConfiguration(src, new ClientConfiguration());
|
createEndpointConfiguration(src, new ClientConfiguration(), src);
|
||||||
String eprStr = epr == null
|
String eprStr = epr == null
|
||||||
? "(empty)"
|
? "(empty)"
|
||||||
: ("(" + epr.getServiceEndpoint() + " " + epr.getSigningRegion());
|
: ("(" + epr.getServiceEndpoint() + " " + epr.getSigningRegion());
|
||||||
|
|
Loading…
Reference in New Issue