diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 1d2122db61b..f94fdebd03e 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1590,6 +1590,14 @@
implementations can still be used
+
+ fs.s3a.accesspoint.required
+ false
+ Require that all S3 access is made through Access Points and not through
+ buckets directly. If enabled, use per-bucket overrides to allow bucket access to a specific set
+ of buckets.
+
+
fs.s3a.block.size
32M
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ArnResource.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ArnResource.java
new file mode 100644
index 00000000000..7c866ac9676
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ArnResource.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import javax.annotation.Nonnull;
+
+import com.amazonaws.arn.Arn;
+import com.amazonaws.regions.RegionUtils;
+
+/**
+ * Represents an Arn Resource, this can be an accesspoint or bucket.
+ */
+public final class ArnResource {
+
+ /**
+ * Resource name.
+ */
+ private final String name;
+
+ /**
+ * Resource owner account id.
+ */
+ private final String ownerAccountId;
+
+ /**
+ * Resource region.
+ */
+ private final String region;
+
+ /**
+ * Full Arn for the resource.
+ */
+ private final String fullArn;
+
+ /**
+ * Partition for the resource. Allowed partitions: aws, aws-cn, aws-us-gov
+ */
+ private final String partition;
+
+ /**
+ * Because of the different ways an endpoint can be constructed depending on partition we're
+ * relying on the AWS SDK to produce the endpoint. In this case we need a region key of the form
+ * {@code String.format("accesspoint-%s", awsRegion)}
+ */
+ private final String accessPointRegionKey;
+
+ private ArnResource(String name, String owner, String region, String partition, String fullArn) {
+ this.name = name;
+ this.ownerAccountId = owner;
+ this.region = region;
+ this.partition = partition;
+ this.fullArn = fullArn;
+ this.accessPointRegionKey = String.format("accesspoint-%s", region);
+ }
+
+ /**
+ * Resource name.
+ * @return resource name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Return owner's account id.
+ * @return owner account id
+ */
+ public String getOwnerAccountId() {
+ return ownerAccountId;
+ }
+
+ /**
+ * Resource region.
+ * @return resource region.
+ */
+ public String getRegion() {
+ return region;
+ }
+
+ /**
+ * Full arn for resource.
+ * @return arn for resource.
+ */
+ public String getFullArn() {
+ return fullArn;
+ }
+
+ /**
+ * Formatted endpoint for the resource.
+ * @return resource endpoint.
+ */
+ public String getEndpoint() {
+ return RegionUtils.getRegion(accessPointRegionKey)
+ .getServiceEndpoint("s3");
+ }
+
+ /**
+ * Parses the passed `arn` string into a full ArnResource.
+ * @param arn - string representing an Arn resource.
+ * @return new ArnResource instance.
+ * @throws IllegalArgumentException - if the Arn is malformed or any of the region, accountId and
+ * resource name properties are empty.
+ */
+ @Nonnull
+ public static ArnResource accessPointFromArn(String arn) throws IllegalArgumentException {
+ Arn parsed = Arn.fromString(arn);
+
+ if (parsed.getRegion().isEmpty() || parsed.getAccountId().isEmpty() ||
+ parsed.getResourceAsString().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format("Access Point Arn %s has an invalid format or missing properties", arn));
+ }
+
+ String resourceName = parsed.getResource().getResource();
+ return new ArnResource(resourceName, parsed.getAccountId(), parsed.getRegion(),
+ parsed.getPartition(), arn);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 461fe87d331..f51d9959ba0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1122,4 +1122,8 @@ public final class Constants {
*/
public static final String AWS_S3_CENTRAL_REGION = "us-east-1";
+ /**
+ * Require that all S3 access is made through Access Points.
+ */
+ public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required";
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 8ce11e23f03..5219b04f179 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -216,10 +216,15 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_INACCESSIBLE;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_S3GUARD_INCOMPATIBLE;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_S3GUARD_INCOMPATIBLE;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
@@ -274,6 +279,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private Invoker s3guardInvoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
Invoker.LOG_EVENT);
private final Retried onRetry = this::operationRetried;
+
+ /**
+ * Represents bucket name for all S3 operations. If per bucket override for
+ * {@link InternalConstants#ARN_BUCKET_OPTION} property is set, then the bucket is updated to
+ * point to the configured Arn.
+ */
private String bucket;
private int maxKeys;
private Listing listing;
@@ -367,6 +378,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean isCSEEnabled;
+ /**
+ * Bucket AccessPoint.
+ */
+ private ArnResource accessPoint;
+
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@@ -408,10 +424,20 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
LOG.debug("Initializing S3AFileSystem for {}", bucket);
// clone the configuration into one with propagated bucket options
Configuration conf = propagateBucketOptions(originalConf, bucket);
-
// HADOOP-17894. remove references to s3a stores in JCEKS credentials.
conf = ProviderUtils.excludeIncompatibleCredentialProviders(
conf, S3AFileSystem.class);
+ String arn = String.format(ARN_BUCKET_OPTION, bucket);
+ String configuredArn = conf.getTrimmed(arn, "");
+ if (!configuredArn.isEmpty()) {
+ accessPoint = ArnResource.accessPointFromArn(configuredArn);
+ LOG.info("Using AccessPoint ARN \"{}\" for bucket {}", configuredArn, bucket);
+ bucket = accessPoint.getFullArn();
+ } else if (conf.getBoolean(AWS_S3_ACCESSPOINT_REQUIRED, false)) {
+ LOG.warn("Access Point usage is required because \"{}\" is enabled," +
+ " but not configured for the bucket: {}", AWS_S3_ACCESSPOINT_REQUIRED, bucket);
+ throw new PathIOException(bucket, AP_REQUIRED_EXCEPTION);
+ }
// fix up the classloader of the configuration to be whatever
// classloader loaded this filesystem.
@@ -477,6 +503,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
"version 2", listVersion);
}
useListV1 = (listVersion == 1);
+ if (accessPoint != null && useListV1) {
+ LOG.warn("V1 list configured in fs.s3a.list.version. This is not supported in by" +
+ " access points. Upgrading to V2");
+ useListV1 = false;
+ }
signerManager = new SignerManager(bucket, this, conf, owner);
signerManager.initCustomSigners();
@@ -554,6 +585,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
if (isCSEEnabled) {
throw new PathIOException(uri.toString(), CSE_S3GUARD_INCOMPATIBLE);
}
+ if (accessPoint != null) {
+ throw new PathIOException(uri.toString(), AP_S3GUARD_INCOMPATIBLE);
+ }
}
// LOG if S3Guard is disabled on the warn level set in config
@@ -741,11 +775,25 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
@Retries.RetryTranslated
protected void verifyBucketExistsV2()
- throws UnknownStoreException, IOException {
+ throws UnknownStoreException, IOException {
if (!invoker.retry("doesBucketExistV2", bucket, true,
trackDurationOfOperation(getDurationTrackerFactory(),
STORE_EXISTS_PROBE.getSymbol(),
- () -> s3.doesBucketExistV2(bucket)))) {
+ () -> {
+ // Bug in SDK always returns `true` for AccessPoint ARNs with `doesBucketExistV2()`
+ // expanding implementation to use ARNs and buckets correctly
+ try {
+ s3.getBucketAcl(bucket);
+ } catch (AmazonServiceException ex) {
+ int statusCode = ex.getStatusCode();
+ if (statusCode == SC_404 ||
+ (statusCode == SC_403 && ex.getMessage().contains(AP_INACCESSIBLE))) {
+ return false;
+ }
+ }
+
+ return true;
+ }))) {
throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does "
+ "not exist");
}
@@ -833,10 +881,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
+ String endpoint = accessPoint == null
+ ? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)
+ : accessPoint.getEndpoint();
+
S3ClientFactory.S3ClientCreationParameters parameters = null;
parameters = new S3ClientFactory.S3ClientCreationParameters()
.withCredentialSet(credentials)
- .withEndpoint(conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT))
+ .withEndpoint(endpoint)
.withMetrics(statisticsContext.newStatisticsFromAwsSdk())
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
.withUserAgentSuffix(uaSuffix)
@@ -1165,7 +1217,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
final String region = trackDurationAndSpan(
STORE_EXISTS_PROBE, bucketName, null, () ->
invoker.retry("getBucketLocation()", bucketName, true, () ->
- s3.getBucketLocation(bucketName)));
+ // If accessPoint then region is known from Arn
+ accessPoint != null
+ ? accessPoint.getRegion()
+ : s3.getBucketLocation(bucketName)));
return fixBucketRegion(region);
}
@@ -4547,6 +4602,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
.append("}");
}
sb.append(", ClientSideEncryption=").append(isCSEEnabled);
+
+ if (accessPoint != null) {
+ sb.append(", arnForBucket=").append(accessPoint.getFullArn());
+ }
sb.append('}');
return sb.toString();
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index cb4e6ac9173..3a7720054cc 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -472,6 +472,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
/**
* Create an IOStatistics store which updates FS metrics
* as well as IOStatistics.
+ * @return instance of the store.
*/
public IOStatisticsStore createMetricsUpdatingStore() {
return new MetricsUpdatingIOStatisticsStore();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index 4dd52d9b88d..286ec2125a7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -95,6 +95,9 @@ public final class InternalConstants {
Arrays.asList(Constants.INPUT_FADVISE,
Constants.READAHEAD_RANGE)));
+ /** 403 error code. */
+ public static final int SC_403 = 403;
+
/** 404 error code. */
public static final int SC_404 = 404;
@@ -139,4 +142,26 @@ public final class InternalConstants {
*/
public static final String CSE_S3GUARD_INCOMPATIBLE = "S3-CSE cannot be "
+ "used with S3Guard";
+
+ /**
+ * Error message to indicate Access Points are incompatible with S3Guard.
+ */
+ public static final String AP_S3GUARD_INCOMPATIBLE = "Access Points cannot be used with S3Guard";
+
+ /**
+ * Error message to indicate Access Points are required to be used for S3 access.
+ */
+ public static final String AP_REQUIRED_EXCEPTION = "Access Points usage is required" +
+ " but not configured for the bucket.";
+
+ /**
+ * Error message to indicate Access Points are not accessible or don't exist.
+ */
+ public static final String AP_INACCESSIBLE = "Could not access through this access point";
+
+ /**
+ * AccessPoint ARN for the bucket. When set as a bucket override the requests for that bucket
+ * will go through the AccessPoint.
+ */
+ public static final String ARN_BUCKET_OPTION = "fs.s3a.bucket.%s.accesspoint.arn";
}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 3d08a1a3ea7..a5c67c16b0b 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1595,6 +1595,62 @@ Why explicitly declare a bucket bound to the central endpoint? It ensures
that if the default endpoint is changed to a new region, data store in
US-east is still reachable.
+## Configuring S3 AccessPoints usage with S3A
+S3a now supports [S3 Access Point](https://aws.amazon.com/s3/features/access-points/) usage which
+improves VPC integration with S3 and simplifies your data's permission model because different
+policies can be applied now on the Access Point level. For more information about why to use and
+how to create them make sure to read the official documentation.
+
+Accessing data through an access point, is done by using its ARN, as opposed to just the bucket name.
+You can set the Access Point ARN property using the following per bucket configuration property:
+```xml
+
+ fs.s3a.sample-bucket.accesspoint.arn
+ {ACCESSPOINT_ARN_HERE}
+ Configure S3a traffic to use this AccessPoint
+
+```
+
+This configures access to the `sample-bucket` bucket for S3A, to go through the
+new Access Point ARN. So, for example `s3a://sample-bucket/key` will now use your
+configured ARN when getting data from S3 instead of your bucket.
+
+You can also use an Access Point name as a path URI such as `s3a://finance-team-access/key`, by
+configuring the `.accesspoint.arn` property as a per-bucket override:
+```xml
+
+ fs.s3a.finance-team-access.accesspoint.arn
+ {ACCESSPOINT_ARN_HERE}
+ Configure S3a traffic to use this AccessPoint
+
+```
+
+The `fs.s3a.accesspoint.required` property can also require all access to S3 to go through Access
+Points. This has the advantage of increasing security inside a VPN / VPC as you only allow access
+to known sources of data defined through Access Points. In case there is a need to access a bucket
+directly (without Access Points) then you can use per bucket overrides to disable this setting on a
+bucket by bucket basis i.e. `fs.s3a.{YOUR-BUCKET}.accesspoint.required`.
+
+```xml
+
+
+ fs.s3a.accesspoint.required
+ true
+
+
+
+ fs.s3a.example-bucket.accesspoint.required
+ false
+
+```
+
+Before using Access Points make sure you're not impacted by the following:
+- `ListObjectsV1` is not supported, this is also deprecated on AWS S3 for performance reasons;
+- The endpoint for S3 requests will automatically change from `s3.amazonaws.com` to use
+`s3-accesspoint.REGION.amazonaws.{com | com.cn}` depending on the Access Point ARN. While
+considering endpoints, if you have any custom signers that use the host endpoint property make
+sure to update them if needed;
+
## How S3A writes data to S3
The original S3A client implemented file writes by
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
index e0a90a243c0..f5da283db36 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
@@ -296,6 +296,15 @@ For the default test dataset, hosted in the `landsat-pds` bucket, this is:
```
+### Testing Access Point Integration
+S3a supports using Access Point ARNs to access data in S3. If you think your changes affect VPC
+integration, request signing, ARN manipulation, or any code path that deals with the actual
+sending and retrieving of data to/from S3, make sure you run the entire integration test suite with
+this feature enabled.
+
+Check out [our documentation](./index.html#accesspoints) for steps on how to enable this feature. To
+create access points for your S3 bucket you can use the AWS Console or CLI.
+
## Viewing Integration Test Reports
@@ -1468,6 +1477,9 @@ as it may take a couple of SDK updates before it is ready.
in `fs.s3a.assumed.role.arn` for testing assumed roles,
and `fs.s3a.encryption.key` for encryption, for full coverage.
If you can, scale up the scale tests.
+1. Create an Access Point for your bucket (using the AWS Console or CLI), update S3a configuration
+to use it ([docs for help](./index.html#accesspoints)) and re-run the `ITest*` integration tests from
+your IDE or via maven.
1. Run the `ILoadTest*` load tests from your IDE or via maven through
`mvn verify -Dtest=skip -Dit.test=ILoadTest\*` ; look for regressions in performance
as much as failures.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
index 695a4a2b682..fb98657227a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/S3AContract.java
@@ -26,8 +26,9 @@ import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.impl.InternalConstants;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipIfS3GuardAndS3CSEIOE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfIOEContainsMessage;
/**
* The contract of S3A: only enabled if the test bucket is provided.
@@ -77,8 +78,10 @@ public class S3AContract extends AbstractBondedFSContract {
try {
super.init();
} catch (PathIOException ioe) {
- // Skip the tests if S3-CSE and S3-Guard are enabled.
- maybeSkipIfS3GuardAndS3CSEIOE(ioe);
+ // Skip the tests if (S3-CSE or Access Points) and S3-Guard are enabled.
+ skipIfIOEContainsMessage(ioe,
+ InternalConstants.CSE_S3GUARD_INCOMPATIBLE,
+ InternalConstants.AP_S3GUARD_INCOMPATIBLE);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java
index 934ad29ed25..a9c5e6dd43e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java
@@ -27,11 +27,14 @@ import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESSPOINT_REQUIRED;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_METASTORE_NULL;
@@ -161,6 +164,35 @@ public class ITestS3ABucketExistence extends AbstractS3ATestBase {
() -> FileSystem.get(uri, configuration));
}
+ @Test
+ public void testAccessPointProbingV2() throws Exception {
+ describe("Test V2 bucket probing using an AccessPoint ARN");
+ Configuration configuration = createConfigurationWithProbe(2);
+ String accessPointArn = "arn:aws:s3:eu-west-1:123456789012:accesspoint/" + randomBucket;
+ configuration.set(String.format(InternalConstants.ARN_BUCKET_OPTION, randomBucket),
+ accessPointArn);
+
+ expectUnknownStore(
+ () -> FileSystem.get(uri, configuration));
+ }
+
+ @Test
+ public void testAccessPointRequired() throws Exception {
+ describe("Test V2 bucket probing with 'fs.s3a.accesspoint.required' property.");
+ Configuration configuration = createConfigurationWithProbe(2);
+ configuration.set(AWS_S3_ACCESSPOINT_REQUIRED, "true");
+ intercept(PathIOException.class,
+ InternalConstants.AP_REQUIRED_EXCEPTION,
+ "Should throw IOException if Access Points are required but not configured.",
+ () -> FileSystem.get(uri, configuration));
+
+ String accessPointArn = "arn:aws:s3:eu-west-1:123456789012:accesspoint/" + randomBucket;
+ configuration.set(String.format(InternalConstants.ARN_BUCKET_OPTION, randomBucket),
+ accessPointArn);
+ expectUnknownStore(
+ () -> FileSystem.get(uri, configuration));
+ }
+
@Override
protected Configuration getConfiguration() {
Configuration configuration = super.getConfiguration();
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index dce99a6144f..906cadd5024 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -65,6 +65,8 @@ public class ITestS3AConfiguration {
private static final String EXAMPLE_ID = "AKASOMEACCESSKEY";
private static final String EXAMPLE_KEY =
"RGV0cm9pdCBSZ/WQgY2xl/YW5lZCB1cAEXAMPLE";
+ private static final String AP_ILLEGAL_ACCESS =
+ "ARN of type accesspoint cannot be passed as a bucket";
private Configuration conf;
private S3AFileSystem fs;
@@ -360,6 +362,14 @@ public class ITestS3AConfiguration {
// isn't in the same region as the s3 client default. See
// http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
assertEquals(HttpStatus.SC_MOVED_PERMANENTLY, e.getStatusCode());
+ } catch (final IllegalArgumentException e) {
+ // Path style addressing does not work with AP ARNs
+ if (!fs.getBucket().contains("arn:")) {
+ LOG.error("Caught unexpected exception: ", e);
+ throw e;
+ }
+
+ GenericTestUtils.assertExceptionContains(AP_ILLEGAL_ACCESS, e);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 113b68ae6c6..1156b8fe6c0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
-import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
@@ -262,17 +261,19 @@ public final class S3ATestUtils {
}
/**
- * Either skip if PathIOE occurred due to S3CSE and S3Guard
- * incompatibility or throw the PathIOE.
+ * Skip if PathIOE occurred due to exception which contains a message which signals
+ * an incompatibility or throw the PathIOE.
*
* @param ioe PathIOE being parsed.
- * @throws PathIOException Throws PathIOE if it doesn't relate to S3CSE
- * and S3Guard incompatibility.
+ * @param messages messages found in the PathIOE that trigger a test to skip
+ * @throws PathIOException Throws PathIOE if it doesn't relate to any message in {@code messages}.
*/
- public static void maybeSkipIfS3GuardAndS3CSEIOE(PathIOException ioe)
+ public static void skipIfIOEContainsMessage(PathIOException ioe, String...messages)
throws PathIOException {
- if (ioe.toString().contains(InternalConstants.CSE_S3GUARD_INCOMPATIBLE)) {
- skip("Skipping since CSE is enabled with S3Guard.");
+ for (String message: messages) {
+ if (ioe.toString().contains(message)) {
+ skip("Skipping because: " + message);
+ }
}
throw ioe;
}
@@ -1562,5 +1563,4 @@ public final class S3ATestUtils {
+ " in " + secrets);
}
}
-
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestArnResource.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestArnResource.java
new file mode 100644
index 00000000000..97069c7c8a1
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestArnResource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.regions.Regions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class TestArnResource extends HadoopTestBase {
+ private final static Logger LOG = LoggerFactory.getLogger(TestArnResource.class);
+
+ @Test
+ public void parseAccessPointFromArn() throws IllegalArgumentException {
+ describe("Parse AccessPoint ArnResource from arn string");
+
+ String accessPoint = "testAp";
+ String accountId = "123456789101";
+ String[][] regionPartitionEndpoints = new String[][] {
+ {Regions.EU_WEST_1.getName(), "aws", "s3-accesspoint.eu-west-1.amazonaws.com"},
+ {Regions.US_GOV_EAST_1.getName(), "aws-us-gov",
+ "s3-accesspoint.us-gov-east-1.amazonaws.com"},
+ {Regions.CN_NORTH_1.getName(), "aws-cn", "s3-accesspoint.cn-north-1.amazonaws.com.cn"},
+ };
+
+ for (String[] testPair : regionPartitionEndpoints) {
+ String region = testPair[0];
+ String partition = testPair[1];
+ String endpoint = testPair[2];
+
+ // arn:partition:service:region:account-id:resource-type/resource-id
+ String arn = String.format("arn:%s:s3:%s:%s:accesspoint/%s", partition, region, accountId,
+ accessPoint);
+
+ ArnResource resource = ArnResource.accessPointFromArn(arn);
+ assertEquals("Arn does not match", arn, resource.getFullArn());
+ assertEquals("Access Point name does not match", accessPoint, resource.getName());
+ assertEquals("Account Id does not match", accountId, resource.getOwnerAccountId());
+ assertEquals("Region does not match", region, resource.getRegion());
+ assertEquals("Endpoint does not match", endpoint, resource.getEndpoint());
+ }
+ }
+
+ @Test
+ public void invalidARNsMustThrow() throws Exception {
+ describe("Using an invalid ARN format must throw when initializing an ArnResource.");
+
+ intercept(IllegalArgumentException.class, () ->
+ ArnResource.accessPointFromArn("invalid:arn:resource"));
+ }
+
+ private void describe(String message) {
+ LOG.info(message);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java
index 287fe51b5ea..9e6d82ce6ac 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/ITestAuditManager.java
@@ -121,8 +121,8 @@ public class ITestAuditManager extends AbstractS3ACostTest {
final S3AFileSystem fs = getFileSystem();
final long exec0 = lookupCounterStatistic(iostats(),
AUDIT_REQUEST_EXECUTION.getSymbol());
- // API call
- fs.getBucketLocation();
+ // API call to a known path, `getBucketLocation()` does not always result in an API call.
+ fs.listStatus(path("/"));
// which MUST have ended up calling the extension request handler
Assertions.assertThat(SimpleAWSRequestHandler.getInvocationCount())
.describedAs("Invocation count of plugged in request handler")
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
index 22ce1e91c29..cbba326d5ec 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.auth;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.amazonaws.SignableRequest;
import com.amazonaws.auth.AWS4Signer;
+import com.amazonaws.arn.Arn;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.Signer;
import com.amazonaws.services.s3.internal.AWSS3V4Signer;
@@ -143,7 +145,7 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
conf.set(TEST_ID_KEY, identifier);
conf.set(TEST_REGION_KEY, regionName);
- conf.set(Constants.ENDPOINT, endpoint);
+
// make absolutely sure there is no caching.
disableFilesystemCaching(conf);
@@ -190,7 +192,7 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
LOG.info("Signing request #{}", c);
String host = request.getEndpoint().getHost();
- String bucketName = host.split("\\.")[0];
+ String bucketName = parseBucketFromHost(host);
try {
lastStoreValue = CustomSignerInitializer
.getStoreValue(bucketName, UserGroupInformation.getCurrentUser());
@@ -214,6 +216,35 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
}
}
+ private String parseBucketFromHost(String host) {
+ String[] hostBits = host.split("\\.");
+ String bucketName = hostBits[0];
+ String service = hostBits[1];
+
+ if (bucketName.equals("kms")) {
+ return bucketName;
+ }
+
+ if (service.contains("s3-accesspoint") || service.contains("s3-outposts")
+ || service.contains("s3-object-lambda")) {
+ // If AccessPoint then bucketName is of format `accessPoint-accountId`;
+ String[] accessPointBits = hostBits[0].split("-");
+ int lastElem = accessPointBits.length - 1;
+ String accountId = accessPointBits[lastElem];
+ String accessPointName = String.join("", Arrays.copyOf(accessPointBits, lastElem));
+ Arn arn = Arn.builder()
+ .withAccountId(accountId)
+ .withPartition("aws")
+ .withRegion(hostBits[2])
+ .withResource("accesspoint" + "/" + accessPointName)
+ .withService("s3").build();
+
+ bucketName = arn.toString();
+ }
+
+ return bucketName;
+ }
+
public static int getInstantiationCount() {
return INSTANTIATION_COUNT.get();
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
index 99a64e6a942..45639dae7b7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
+import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.statistics.StatisticTypeEnum;
import org.apache.hadoop.fs.store.audit.AuditSpan;
@@ -125,6 +126,14 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
String bucketName = getTestBucketName(conf);
+ // If AccessPoint ARN is set guarded tests are skipped
+ String arnKey = String.format(InternalConstants.ARN_BUCKET_OPTION, bucketName);
+ String arn = conf.getTrimmed(arnKey, "");
+ if (isGuarded() && !arn.isEmpty()) {
+ ContractTestUtils.skip(
+ "Skipping test since AccessPoint ARN is set and is incompatible with S3Guard.");
+ }
+
removeBucketOverrides(bucketName, conf,
S3_METADATA_STORE_IMPL);
if (!isGuarded()) {
@@ -146,6 +155,12 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
}
disableFilesystemCaching(conf);
+
+ // AccessPoint ARN is the only per bucket configuration that must be kept.
+ if (!arn.isEmpty()) {
+ conf.set(arnKey, arn);
+ }
+
return conf;
}