HADOOP-13336 S3A to support per-bucket configuration. Contributed by Steve Loughran
This commit is contained in:
parent
357eab9566
commit
e648b6e138
|
@ -954,6 +954,15 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.security.credential.provider.path</name>
|
||||
<value />
|
||||
<description>
|
||||
Optional comma separated list of credential providers, a list
|
||||
which is prepended to that set in hadoop.security.credential.provider.path
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.connection.maximum</name>
|
||||
<value>15</value>
|
||||
|
|
|
@ -48,6 +48,14 @@ public final class Constants {
|
|||
public static final String AWS_CREDENTIALS_PROVIDER =
|
||||
"fs.s3a.aws.credentials.provider";
|
||||
|
||||
/**
|
||||
* Extra set of security credentials which will be prepended to that
|
||||
* set in {@code "hadoop.security.credential.provider.path"}.
|
||||
* This extra option allows for per-bucket overrides.
|
||||
*/
|
||||
public static final String S3A_SECURITY_CREDENTIAL_PROVIDER_PATH =
|
||||
"fs.s3a.security.credential.provider.path";
|
||||
|
||||
// session token for when using TemporaryAWSCredentialsProvider
|
||||
public static final String SESSION_TOKEN = "fs.s3a.session.token";
|
||||
|
||||
|
@ -226,6 +234,12 @@ public final class Constants {
|
|||
public static final String FS_S3A_BLOCK_SIZE = "fs.s3a.block.size";
|
||||
public static final String FS_S3A = "s3a";
|
||||
|
||||
/** Prefix for all S3A properties: {@value}. */
|
||||
public static final String FS_S3A_PREFIX = "fs.s3a.";
|
||||
|
||||
/** Prefix for S3A bucket-specific properties: {@value}. */
|
||||
public static final String FS_S3A_BUCKET_PREFIX = "fs.s3a.bucket.";
|
||||
|
||||
public static final int S3A_DEFAULT_PORT = -1;
|
||||
|
||||
public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix";
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.util.Date;
|
|||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -153,21 +152,28 @@ public class S3AFileSystem extends FileSystem {
|
|||
/** Called after a new FileSystem instance is constructed.
|
||||
* @param name a uri whose authority section names the host, port, etc.
|
||||
* for this FileSystem
|
||||
* @param conf the configuration
|
||||
* @param originalConf the configuration to use for the FS. The
|
||||
* bucket-specific options are patched over the base ones before any use is
|
||||
* made of the config.
|
||||
*/
|
||||
public void initialize(URI name, Configuration conf) throws IOException {
|
||||
public void initialize(URI name, Configuration originalConf)
|
||||
throws IOException {
|
||||
uri = S3xLoginHelper.buildFSURI(name);
|
||||
// get the host; this is guaranteed to be non-null, non-empty
|
||||
bucket = name.getHost();
|
||||
// clone the configuration into one with propagated bucket options
|
||||
Configuration conf = propagateBucketOptions(originalConf, bucket);
|
||||
patchSecurityCredentialProviders(conf);
|
||||
super.initialize(name, conf);
|
||||
setConf(conf);
|
||||
try {
|
||||
instrumentation = new S3AInstrumentation(name);
|
||||
|
||||
uri = S3xLoginHelper.buildFSURI(name);
|
||||
// Username is the current user at the time the FS was instantiated.
|
||||
username = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
workingDir = new Path("/user", username)
|
||||
.makeQualified(this.uri, this.getWorkingDirectory());
|
||||
|
||||
bucket = name.getHost();
|
||||
|
||||
Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
|
||||
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
|
||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||
import org.apache.hadoop.security.ProviderUtils;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.EOFException;
|
||||
|
@ -46,15 +48,13 @@ import java.lang.reflect.Method;
|
|||
import java.lang.reflect.Modifier;
|
||||
import java.net.URI;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.AWS_CREDENTIALS_PROVIDER;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
|
||||
/**
|
||||
* Utility methods for S3A code.
|
||||
|
@ -74,6 +74,13 @@ public final class S3AUtils {
|
|||
"is abstract and therefore cannot be created";
|
||||
static final String ENDPOINT_KEY = "Endpoint";
|
||||
|
||||
/**
|
||||
* Core property for provider path. Duplicated here for consistent
|
||||
* code across Hadoop version: {@value}.
|
||||
*/
|
||||
static final String CREDENTIAL_PROVIDER_PATH =
|
||||
"hadoop.security.credential.provider.path";
|
||||
|
||||
private S3AUtils() {
|
||||
}
|
||||
|
||||
|
@ -636,4 +643,84 @@ public final class S3AUtils {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Propagates bucket-specific settings into generic S3A configuration keys.
|
||||
* This is done by propagating the values of the form
|
||||
* {@code fs.s3a.bucket.${bucket}.key} to
|
||||
* {@code fs.s3a.key}, for all values of "key" other than a small set
|
||||
* of unmodifiable values.
|
||||
*
|
||||
* The source of the updated property is set to the key name of the bucket
|
||||
* property, to aid in diagnostics of where things came from.
|
||||
*
|
||||
* Returns a new configuration. Why the clone?
|
||||
* You can use the same conf for different filesystems, and the original
|
||||
* values are not updated.
|
||||
*
|
||||
* The {@code fs.s3a.impl} property cannot be set, nor can
|
||||
* any with the prefix {@code fs.s3a.bucket}.
|
||||
*
|
||||
* This method does not propagate security provider path information from
|
||||
* the S3A property into the Hadoop common provider: callers must call
|
||||
* {@link #patchSecurityCredentialProviders(Configuration)} explicitly.
|
||||
* @param source Source Configuration object.
|
||||
* @param bucket bucket name. Must not be empty.
|
||||
* @return a (potentially) patched clone of the original.
|
||||
*/
|
||||
public static Configuration propagateBucketOptions(Configuration source,
|
||||
String bucket) {
|
||||
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "bucket");
|
||||
final String bucketPrefix = FS_S3A_BUCKET_PREFIX + bucket +'.';
|
||||
LOG.debug("Propagating entries under {}", bucketPrefix);
|
||||
final Configuration dest = new Configuration(source);
|
||||
for (Map.Entry<String, String> entry : source) {
|
||||
final String key = entry.getKey();
|
||||
// get the (unexpanded) value.
|
||||
final String value = entry.getValue();
|
||||
if (!key.startsWith(bucketPrefix) || bucketPrefix.equals(key)) {
|
||||
continue;
|
||||
}
|
||||
// there's a bucket prefix, so strip it
|
||||
final String stripped = key.substring(bucketPrefix.length());
|
||||
if (stripped.startsWith("bucket.") || "impl".equals(stripped)) {
|
||||
//tell user off
|
||||
LOG.debug("Ignoring bucket option {}", key);
|
||||
} else {
|
||||
// propagate the value, building a new origin field.
|
||||
// to track overwrites, the generic key is overwritten even if
|
||||
// already matches the new one.
|
||||
final String generic = FS_S3A_PREFIX + stripped;
|
||||
LOG.debug("Updating {}", generic);
|
||||
dest.set(generic, value, key);
|
||||
}
|
||||
}
|
||||
return dest;
|
||||
}
|
||||
|
||||
/**
|
||||
* Patch the security credential provider information in
|
||||
* {@link #CREDENTIAL_PROVIDER_PATH}
|
||||
* with the providers listed in
|
||||
* {@link Constants#S3A_SECURITY_CREDENTIAL_PROVIDER_PATH}.
|
||||
*
|
||||
* This allows different buckets to use different credential files.
|
||||
* @param conf configuration to patch
|
||||
*/
|
||||
static void patchSecurityCredentialProviders(Configuration conf) {
|
||||
Collection<String> customCredentials = conf.getStringCollection(
|
||||
S3A_SECURITY_CREDENTIAL_PROVIDER_PATH);
|
||||
Collection<String> hadoopCredentials = conf.getStringCollection(
|
||||
CREDENTIAL_PROVIDER_PATH);
|
||||
if (!customCredentials.isEmpty()) {
|
||||
List<String> all = Lists.newArrayList(customCredentials);
|
||||
all.addAll(hadoopCredentials);
|
||||
String joined = StringUtils.join(all, ',');
|
||||
LOG.debug("Setting {} to {}", CREDENTIAL_PROVIDER_PATH,
|
||||
joined);
|
||||
conf.set(CREDENTIAL_PROVIDER_PATH, joined,
|
||||
"patch of " + S3A_SECURITY_CREDENTIAL_PROVIDER_PATH);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -255,6 +255,7 @@ properties, the Hadoop key management store and IAM roles.
|
|||
* Test suites includes distcp and suites in downstream projects.
|
||||
* Available since Hadoop 2.6; considered production ready in Hadoop 2.7.
|
||||
* Actively maintained.
|
||||
* Supports per-bucket configuration.
|
||||
|
||||
S3A is now the recommended client for working with S3 objects. It is also the
|
||||
one where patches for functionality and performance are very welcome.
|
||||
|
@ -609,6 +610,29 @@ in XML configuration files.
|
|||
Because this property only supplies the path to the secrets file, the configuration
|
||||
option itself is no longer a sensitive item.
|
||||
|
||||
The property `hadoop.security.credential.provider.path` is global to all
|
||||
filesystems and secrets.
|
||||
There is another property, `fs.s3a.security.credential.provider.path`
|
||||
which only lists credential providers for S3A filesystems.
|
||||
The two properties are combined into one, with the list of providers in the
|
||||
`fs.s3a.` property taking precedence
|
||||
over that of the `hadoop.security` list (i.e. they are prepended to the common list).
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.security.credential.provider.path</name>
|
||||
<value />
|
||||
<description>
|
||||
Optional comma separated list of credential providers, a list
|
||||
which is prepended to that set in hadoop.security.credential.provider.path
|
||||
</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
Supporting a separate list in an `fs.s3a.` prefix permits per-bucket configuration
|
||||
of credential files.
|
||||
|
||||
|
||||
###### Using the credentials
|
||||
|
||||
Once the provider is set in the Hadoop configuration, hadoop commands
|
||||
|
@ -631,7 +655,7 @@ hadoop distcp \
|
|||
hdfs://nn1.example.com:9001/user/backup/007020615 s3a://glacier1/
|
||||
|
||||
hadoop fs \
|
||||
-D hadoop.security.credential.provider.path=jceks://hdfs@nn1.example.com:9001/user/backup/s3.jceks \
|
||||
-D fs.s3a.security.credential.provider.path=jceks://hdfs@nn1.example.com:9001/user/backup/s3.jceks \
|
||||
-ls s3a://glacier1/
|
||||
|
||||
```
|
||||
|
@ -869,6 +893,78 @@ from placing its declaration on the command line.
|
|||
any call to setReadahead() is made to an open stream.</description>
|
||||
</property>
|
||||
|
||||
### Configurations different S3 buckets
|
||||
|
||||
Different S3 buckets can be accessed with different S3A client configurations.
|
||||
This allows for different endpoints, data read and write strategies, as well
|
||||
as login details.
|
||||
|
||||
1. All `fs.s3a` options other than a small set of unmodifiable values
|
||||
(currently `fs.s3a.impl`) can be set on a per bucket basis.
|
||||
1. The bucket specific option is set by replacing the `fs.s3a.` prefix on an option
|
||||
with `fs.s3a.bucket.BUCKETNAME.`, where `BUCKETNAME` is the name of the bucket.
|
||||
1. When connecting to a bucket, all options explicitly set will override
|
||||
the base `fs.s3a.` values.
|
||||
|
||||
As an example, a configuration could have a base configuration to use the IAM
|
||||
role information available when deployed in Amazon EC2.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.aws.credentials.provider</name>
|
||||
<value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
This will be the default authentication mechanism for S3A buckets.
|
||||
|
||||
A bucket `s3a://nightly/` used for nightly data uses a session key:
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.bucket.nightly.access.key</name>
|
||||
<value>AKAACCESSKEY-2</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.bucket.nightly.secret.key</name>
|
||||
<value>SESSIONSECRETKEY</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.bucket.nightly.session.token</name>
|
||||
<value>Short-lived-session-token</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.bucket.nightly.aws.credentials.provider</name>
|
||||
<value>org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
Finally, the public `s3a://landsat-pds/` bucket is accessed anonymously:
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.bucket.landsat-pds.aws.credentials.provider</name>
|
||||
<value>org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
**Customizing S3A secrets held in credential files**
|
||||
|
||||
Although most properties are automatically propagated from their
|
||||
`fs.s3a.bucket.`-prefixed custom entry to that of the base `fs.s3a.` option
|
||||
supporting secrets kept in Hadoop credential files is slightly more complex.
|
||||
This is because the property values are kept in these files, and cannot be
|
||||
dynamically patched.
|
||||
|
||||
Instead, callers need to create different configuration files for each
|
||||
bucket, setting the base secrets (`fs.s3a.bucket.nightly.access.key`, etc),
|
||||
then declare the path to the appropriate credential file in
|
||||
a bucket-specific version of the property `fs.s3a.security.credential.provider.path`.
|
||||
|
||||
|
||||
### Working with buckets in different regions
|
||||
|
||||
S3 Buckets are hosted in different regions, the default being US-East.
|
||||
|
@ -924,6 +1020,16 @@ If the wrong endpoint is used, the request may fail. This may be reported as a 3
|
|||
or as a 400 Bad Request.
|
||||
|
||||
|
||||
If you are trying to mix endpoints for different buckets, use a per-bucket endpoint
|
||||
declaration. For example:
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.bucket.landsat-pds.endpoint</name>
|
||||
<value>s3.amazonaws.com</value>
|
||||
<description>The endpoint for s3a://landsat-pds URLs</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
### <a name="s3a_fast_upload"></a>Stabilizing: S3A Fast Upload
|
||||
|
||||
|
@ -1603,15 +1709,15 @@ org.apache.hadoop.fs.s3a.AWSS3IOException: Received permanent redirect response
|
|||
1. If not using "V4" authentication (see above), the original S3 endpoint
|
||||
can be used:
|
||||
|
||||
```
|
||||
<property>
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.endpoint</name>
|
||||
<value>s3.amazonaws.com</value>
|
||||
</property>
|
||||
</property>
|
||||
```
|
||||
|
||||
Using the explicit endpoint for the region is recommended for speed and the
|
||||
ability to use the V4 signing API.
|
||||
Using the explicit endpoint for the region is recommended for speed and
|
||||
to use the V4 signing API.
|
||||
|
||||
|
||||
### "Timeout waiting for connection from pool" when writing to S3A
|
||||
|
@ -2163,7 +2269,22 @@ is hosted in Amazon's US-east datacenter.
|
|||
1. If the property is set to a different path, then that data must be readable
|
||||
and "sufficiently" large.
|
||||
|
||||
To test on different S3 endpoints, or alternate infrastructures supporting
|
||||
(the reason the space or newline is needed is to add "an empty entry"; an empty
|
||||
`<value/>` would be considered undefined and pick up the default)
|
||||
|
||||
Of using a test file in an S3 region requiring a different endpoint value
|
||||
set in `fs.s3a.endpoint`, a bucket-specific endpoint must be defined.
|
||||
For the default test dataset, hosted in the `landsat-pds` bucket, this is:
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.bucket.landsat-pds.endpoint</name>
|
||||
<value>s3.amazonaws.com</value>
|
||||
<description>The endpoint for s3a://landsat-pds URLs</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
To test on alternate infrastructures supporting
|
||||
the same APIs, the option `fs.s3a.scale.test.csvfile` must either be
|
||||
set to " ", or an object of at least 10MB is uploaded to the object store, and
|
||||
the `fs.s3a.scale.test.csvfile` option set to its path.
|
||||
|
@ -2175,20 +2296,6 @@ the `fs.s3a.scale.test.csvfile` option set to its path.
|
|||
</property>
|
||||
```
|
||||
|
||||
(the reason the space or newline is needed is to add "an empty entry"; an empty
|
||||
`<value/>` would be considered undefined and pick up the default)
|
||||
|
||||
*Note:* if using a test file in an S3 region requiring a different endpoint value
|
||||
set in `fs.s3a.endpoint`, define it in `fs.s3a.scale.test.csvfile.endpoint`.
|
||||
If the default CSV file is used, the tests will automatically use the us-east
|
||||
endpoint:
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.s3a.scale.test.csvfile.endpoint</name>
|
||||
<value>s3.amazonaws.com</value>
|
||||
</property>
|
||||
```
|
||||
### Viewing Integration Test Reports
|
||||
|
||||
|
||||
|
|
|
@ -129,7 +129,6 @@ public class ITestS3AAWSCredentialsProvider {
|
|||
AnonymousAWSCredentialsProvider.class.getName());
|
||||
Path testFile = new Path(
|
||||
conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE));
|
||||
S3ATestUtils.useCSVDataEndpoint(conf);
|
||||
FileSystem fs = FileSystem.newInstance(testFile.toUri(), conf);
|
||||
assertNotNull(fs);
|
||||
assertTrue(fs instanceof S3AFileSystem);
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.junit.rules.Timeout;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.TEST_FS_S3A_NAME;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
@ -45,6 +46,7 @@ import static org.junit.Assert.fail;
|
|||
import java.io.File;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.security.ProviderUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -54,6 +56,10 @@ import org.apache.hadoop.util.VersionInfo;
|
|||
import org.apache.http.HttpStatus;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||
|
||||
/**
|
||||
* S3A tests for configuration.
|
||||
*/
|
||||
|
@ -505,4 +511,128 @@ public class ITestS3AConfiguration {
|
|||
fieldType.isAssignableFrom(obj.getClass()));
|
||||
return fieldType.cast(obj);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBucketConfigurationPropagation() throws Throwable {
|
||||
Configuration config = new Configuration(false);
|
||||
setBucketOption(config, "b", "base", "1024");
|
||||
String basekey = "fs.s3a.base";
|
||||
assertOptionEquals(config, basekey, null);
|
||||
String bucketKey = "fs.s3a.bucket.b.base";
|
||||
assertOptionEquals(config, bucketKey, "1024");
|
||||
Configuration updated = propagateBucketOptions(config, "b");
|
||||
assertOptionEquals(updated, basekey, "1024");
|
||||
// original conf is not updated
|
||||
assertOptionEquals(config, basekey, null);
|
||||
|
||||
String[] sources = updated.getPropertySources(basekey);
|
||||
assertEquals(1, sources.length);
|
||||
String sourceInfo = sources[0];
|
||||
assertTrue("Wrong source " + sourceInfo, sourceInfo.contains(bucketKey));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBucketConfigurationPropagationResolution() throws Throwable {
|
||||
Configuration config = new Configuration(false);
|
||||
String basekey = "fs.s3a.base";
|
||||
String baseref = "fs.s3a.baseref";
|
||||
String baseref2 = "fs.s3a.baseref2";
|
||||
config.set(basekey, "orig");
|
||||
config.set(baseref2, "${fs.s3a.base}");
|
||||
setBucketOption(config, "b", basekey, "1024");
|
||||
setBucketOption(config, "b", baseref, "${fs.s3a.base}");
|
||||
Configuration updated = propagateBucketOptions(config, "b");
|
||||
assertOptionEquals(updated, basekey, "1024");
|
||||
assertOptionEquals(updated, baseref, "1024");
|
||||
assertOptionEquals(updated, baseref2, "1024");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleBucketConfigurations() throws Throwable {
|
||||
Configuration config = new Configuration(false);
|
||||
setBucketOption(config, "b", USER_AGENT_PREFIX, "UA-b");
|
||||
setBucketOption(config, "c", USER_AGENT_PREFIX, "UA-c");
|
||||
config.set(USER_AGENT_PREFIX, "UA-orig");
|
||||
Configuration updated = propagateBucketOptions(config, "c");
|
||||
assertOptionEquals(updated, USER_AGENT_PREFIX, "UA-c");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBucketConfigurationSkipsUnmodifiable() throws Throwable {
|
||||
Configuration config = new Configuration(false);
|
||||
String impl = "fs.s3a.impl";
|
||||
config.set(impl, "orig");
|
||||
setBucketOption(config, "b", impl, "b");
|
||||
String metastoreImpl = "fs.s3a.metadatastore.impl";
|
||||
String ddb = "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore";
|
||||
setBucketOption(config, "b", metastoreImpl, ddb);
|
||||
setBucketOption(config, "b", "impl2", "b2");
|
||||
setBucketOption(config, "b", "bucket.b.loop", "b3");
|
||||
assertOptionEquals(config, "fs.s3a.bucket.b.impl", "b");
|
||||
|
||||
Configuration updated = propagateBucketOptions(config, "b");
|
||||
assertOptionEquals(updated, impl, "orig");
|
||||
assertOptionEquals(updated, "fs.s3a.impl2", "b2");
|
||||
assertOptionEquals(updated, metastoreImpl, ddb);
|
||||
assertOptionEquals(updated, "fs.s3a.bucket.b.loop", null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfOptionPropagationToFS() throws Exception {
|
||||
Configuration config = new Configuration();
|
||||
String testFSName = config.getTrimmed(TEST_FS_S3A_NAME, "");
|
||||
String bucket = new URI(testFSName).getHost();
|
||||
setBucketOption(config, bucket, "propagation", "propagated");
|
||||
fs = S3ATestUtils.createTestFileSystem(config);
|
||||
Configuration updated = fs.getConf();
|
||||
assertOptionEquals(updated, "fs.s3a.propagation", "propagated");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecurityCredentialPropagationNoOverride() throws Exception {
|
||||
Configuration config = new Configuration();
|
||||
config.set(CREDENTIAL_PROVIDER_PATH, "base");
|
||||
patchSecurityCredentialProviders(config);
|
||||
assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
|
||||
"base");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecurityCredentialPropagationOverrideNoBase()
|
||||
throws Exception {
|
||||
Configuration config = new Configuration();
|
||||
config.unset(CREDENTIAL_PROVIDER_PATH);
|
||||
config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
|
||||
patchSecurityCredentialProviders(config);
|
||||
assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
|
||||
"override");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecurityCredentialPropagationOverride() throws Exception {
|
||||
Configuration config = new Configuration();
|
||||
config.set(CREDENTIAL_PROVIDER_PATH, "base");
|
||||
config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
|
||||
patchSecurityCredentialProviders(config);
|
||||
assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
|
||||
"override,base");
|
||||
Collection<String> all = config.getStringCollection(
|
||||
CREDENTIAL_PROVIDER_PATH);
|
||||
assertTrue(all.contains("override"));
|
||||
assertTrue(all.contains("base"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecurityCredentialPropagationEndToEnd() throws Exception {
|
||||
Configuration config = new Configuration();
|
||||
config.set(CREDENTIAL_PROVIDER_PATH, "base");
|
||||
setBucketOption(config, "b", S3A_SECURITY_CREDENTIAL_PROVIDER_PATH,
|
||||
"override");
|
||||
Configuration updated = propagateBucketOptions(config, "b");
|
||||
|
||||
patchSecurityCredentialProviders(updated);
|
||||
assertOptionEquals(updated, CREDENTIAL_PROVIDER_PATH,
|
||||
"override,base");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -85,18 +85,6 @@ public interface S3ATestConstants {
|
|||
*/
|
||||
String DEFAULT_CSVTEST_FILE = "s3a://landsat-pds/scene_list.gz";
|
||||
|
||||
/**
|
||||
* Endpoint for the S3 CSV/scale tests. This defaults to
|
||||
* being us-east.
|
||||
*/
|
||||
String KEY_CSVTEST_ENDPOINT = S3A_SCALE_TEST + "csvfile.endpoint";
|
||||
|
||||
/**
|
||||
* Endpoint for the S3 CSV/scale tests. This defaults to
|
||||
* being us-east.
|
||||
*/
|
||||
String DEFAULT_CSVTEST_ENDPOINT = "s3.amazonaws.com";
|
||||
|
||||
/**
|
||||
* Name of the property to define the timeout for scale tests: {@value}.
|
||||
* Measured in seconds.
|
||||
|
|
|
@ -22,10 +22,11 @@ import org.apache.commons.lang.StringUtils;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.internal.AssumptionViolatedException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
@ -34,11 +35,14 @@ import java.util.List;
|
|||
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* Utilities for the S3A tests.
|
||||
*/
|
||||
public final class S3ATestUtils {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
S3ATestUtils.class);
|
||||
|
||||
/**
|
||||
* Value to set a system property to (in maven) to declare that
|
||||
|
@ -136,20 +140,6 @@ public final class S3ATestUtils {
|
|||
return fc;
|
||||
}
|
||||
|
||||
/**
|
||||
* patch the endpoint option so that irrespective of where other tests
|
||||
* are working, the IO performance tests can work with the landsat
|
||||
* images.
|
||||
* @param conf configuration to patch
|
||||
*/
|
||||
public static void useCSVDataEndpoint(Configuration conf) {
|
||||
String endpoint = conf.getTrimmed(S3AScaleTestBase.KEY_CSVTEST_ENDPOINT,
|
||||
S3AScaleTestBase.DEFAULT_CSVTEST_ENDPOINT);
|
||||
if (!endpoint.isEmpty()) {
|
||||
conf.set(ENDPOINT, endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a long test property.
|
||||
* <ol>
|
||||
|
@ -511,4 +501,47 @@ public final class S3ATestUtils {
|
|||
*/
|
||||
private S3ATestUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a bucket specific property to a particular value.
|
||||
* If the generic key passed in has an {@code fs.s3a. prefix},
|
||||
* that's stripped off, so that when the the bucket properties are propagated
|
||||
* down to the generic values, that value gets copied down.
|
||||
* @param conf configuration to set
|
||||
* @param bucket bucket name
|
||||
* @param genericKey key; can start with "fs.s3a."
|
||||
* @param value value to set
|
||||
*/
|
||||
public static void setBucketOption(Configuration conf, String bucket,
|
||||
String genericKey, String value) {
|
||||
final String baseKey = genericKey.startsWith(FS_S3A_PREFIX) ?
|
||||
genericKey.substring(FS_S3A_PREFIX.length())
|
||||
: genericKey;
|
||||
conf.set(FS_S3A_BUCKET_PREFIX + bucket + '.' + baseKey, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that a configuration option matches the expected value.
|
||||
* @param conf configuration
|
||||
* @param key option key
|
||||
* @param expected expected value
|
||||
*/
|
||||
public static void assertOptionEquals(Configuration conf,
|
||||
String key,
|
||||
String expected) {
|
||||
assertEquals("Value of " + key, expected, conf.get(key));
|
||||
}
|
||||
|
||||
/**
|
||||
* Assume that a condition is met. If not: log at WARN and
|
||||
* then throw an {@link AssumptionViolatedException}.
|
||||
* @param message
|
||||
* @param condition
|
||||
*/
|
||||
public static void assume(String message, boolean condition) {
|
||||
if (!condition) {
|
||||
LOG.warn(message);
|
||||
}
|
||||
Assume.assumeTrue(message, condition);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
|||
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
|
||||
import org.apache.hadoop.fs.s3a.S3AInputStream;
|
||||
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
|
@ -36,7 +35,6 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
|||
import org.apache.hadoop.util.LineReader;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -47,6 +45,7 @@ import java.io.IOException;
|
|||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
||||
|
||||
/**
|
||||
* Look at the performance of S3a operations.
|
||||
|
@ -79,10 +78,11 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
|
|||
String testFile = conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE);
|
||||
if (testFile.isEmpty()) {
|
||||
assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE;
|
||||
LOG.warn(assumptionMessage);
|
||||
testDataAvailable = false;
|
||||
} else {
|
||||
S3ATestUtils.useCSVDataEndpoint(conf);
|
||||
testData = new Path(testFile);
|
||||
LOG.info("Using {} as input stream source", testData);
|
||||
Path path = this.testData;
|
||||
bindS3aFS(path);
|
||||
try {
|
||||
|
@ -113,7 +113,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
|
|||
* Declare that the test requires the CSV test dataset.
|
||||
*/
|
||||
private void requireCSVTestData() {
|
||||
Assume.assumeTrue(assumptionMessage, testDataAvailable);
|
||||
assume(assumptionMessage, testDataAvailable);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -146,7 +146,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
|
|||
|
||||
/**
|
||||
* Open a test file with the read buffer specified in the setting
|
||||
* {@link #KEY_READ_BUFFER_SIZE}.
|
||||
* {@link org.apache.hadoop.fs.s3a.S3ATestConstants#KEY_READ_BUFFER_SIZE}.
|
||||
*
|
||||
* @param path path to open
|
||||
* @param inputPolicy input policy to use
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.fs.s3a.S3ATestConstants;
|
|||
import org.apache.hadoop.fs.s3a.Statistic;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
|
||||
import org.junit.Assume;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -91,14 +90,13 @@ public class S3AScaleTestBase extends AbstractS3ATestBase {
|
|||
super.setup();
|
||||
testPath = path("/tests3ascale");
|
||||
LOG.debug("Scale test operation count = {}", getOperationCount());
|
||||
// multipart purges are disabled on the scale tests
|
||||
// check for the test being enabled
|
||||
enabled = getTestPropertyBool(
|
||||
getConf(),
|
||||
KEY_SCALE_TESTS_ENABLED,
|
||||
DEFAULT_SCALE_TESTS_ENABLED);
|
||||
Assume.assumeTrue("Scale test disabled: to enable set property " +
|
||||
KEY_SCALE_TESTS_ENABLED, isEnabled());
|
||||
assume("Scale test disabled: to enable set property " +
|
||||
KEY_SCALE_TESTS_ENABLED,
|
||||
isEnabled());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue