HADOOP-17894. CredentialProviderFactory.getProviders() recursion loading JCEKS file from S3A (#3393)
* CredentialProviderFactory to detect and report on recursion. * S3AFS to remove incompatible providers. * Integration Test for this. Contributed by Steve Loughran. Change-Id: Ia247b3c9fe8488ffdb7f57b40eb6e37c57e522ef
This commit is contained in:
parent
76393e1359
commit
a2242df10a
|
@ -25,11 +25,13 @@ import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ServiceLoader;
|
import java.util.ServiceLoader;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.fs.PathIOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A factory to create a list of CredentialProvider based on the path given in a
|
* A factory to create a list of CredentialProvider based on the path given in a
|
||||||
|
@ -59,9 +61,18 @@ public abstract class CredentialProviderFactory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fail fast on any recursive load of credential providers, which can
|
||||||
|
* happen if the FS itself triggers the load.
|
||||||
|
* A simple boolean could be used here, as the synchronized block ensures
|
||||||
|
* that only one thread can be active at a time. An atomic is used
|
||||||
|
* for rigorousness.
|
||||||
|
*/
|
||||||
|
private static final AtomicBoolean SERVICE_LOADER_LOCKED = new AtomicBoolean(false);
|
||||||
|
|
||||||
public static List<CredentialProvider> getProviders(Configuration conf
|
public static List<CredentialProvider> getProviders(Configuration conf
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
List<CredentialProvider> result = new ArrayList<CredentialProvider>();
|
List<CredentialProvider> result = new ArrayList<>();
|
||||||
for(String path: conf.getStringCollection(CREDENTIAL_PROVIDER_PATH)) {
|
for(String path: conf.getStringCollection(CREDENTIAL_PROVIDER_PATH)) {
|
||||||
try {
|
try {
|
||||||
URI uri = new URI(path);
|
URI uri = new URI(path);
|
||||||
|
@ -69,13 +80,23 @@ public abstract class CredentialProviderFactory {
|
||||||
// Iterate serviceLoader in a synchronized block since
|
// Iterate serviceLoader in a synchronized block since
|
||||||
// serviceLoader iterator is not thread-safe.
|
// serviceLoader iterator is not thread-safe.
|
||||||
synchronized (serviceLoader) {
|
synchronized (serviceLoader) {
|
||||||
for (CredentialProviderFactory factory : serviceLoader) {
|
try {
|
||||||
CredentialProvider kp = factory.createProvider(uri, conf);
|
if (SERVICE_LOADER_LOCKED.getAndSet(true)) {
|
||||||
if (kp != null) {
|
throw new PathIOException(path,
|
||||||
result.add(kp);
|
"Recursive load of credential provider; " +
|
||||||
found = true;
|
"if loading a JCEKS file, this means that the filesystem connector is " +
|
||||||
break;
|
"trying to load the same file");
|
||||||
}
|
}
|
||||||
|
for (CredentialProviderFactory factory : serviceLoader) {
|
||||||
|
CredentialProvider kp = factory.createProvider(uri, conf);
|
||||||
|
if (kp != null) {
|
||||||
|
result.add(kp);
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
SERVICE_LOADER_LOCKED.set(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!found) {
|
if (!found) {
|
||||||
|
|
|
@ -186,6 +186,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
import org.apache.hadoop.fs.store.EtagChecksum;
|
import org.apache.hadoop.fs.store.EtagChecksum;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
||||||
|
import org.apache.hadoop.security.ProviderUtils;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
@ -393,6 +394,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
LOG.debug("Initializing S3AFileSystem for {}", bucket);
|
LOG.debug("Initializing S3AFileSystem for {}", bucket);
|
||||||
// clone the configuration into one with propagated bucket options
|
// clone the configuration into one with propagated bucket options
|
||||||
Configuration conf = propagateBucketOptions(originalConf, bucket);
|
Configuration conf = propagateBucketOptions(originalConf, bucket);
|
||||||
|
|
||||||
|
// HADOOP-17894. remove references to s3a stores in JCEKS credentials.
|
||||||
|
conf = ProviderUtils.excludeIncompatibleCredentialProviders(
|
||||||
|
conf, S3AFileSystem.class);
|
||||||
|
|
||||||
// fix up the classloader of the configuration to be whatever
|
// fix up the classloader of the configuration to be whatever
|
||||||
// classloader loaded this filesystem.
|
// classloader loaded this filesystem.
|
||||||
// See: HADOOP-17372
|
// See: HADOOP-17372
|
||||||
|
|
|
@ -1153,7 +1153,7 @@ public final class S3AUtils {
|
||||||
public static Configuration propagateBucketOptions(Configuration source,
|
public static Configuration propagateBucketOptions(Configuration source,
|
||||||
String bucket) {
|
String bucket) {
|
||||||
|
|
||||||
Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "bucket");
|
Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "bucket is null/empty");
|
||||||
final String bucketPrefix = FS_S3A_BUCKET_PREFIX + bucket +'.';
|
final String bucketPrefix = FS_S3A_BUCKET_PREFIX + bucket +'.';
|
||||||
LOG.debug("Propagating entries under {}", bucketPrefix);
|
LOG.debug("Propagating entries under {}", bucketPrefix);
|
||||||
final Configuration dest = new Configuration(source);
|
final Configuration dest = new Configuration(source);
|
||||||
|
|
|
@ -0,0 +1,190 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.auth;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.alias.CredentialShell;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.S3A_SECURITY_CREDENTIAL_PROVIDER_PATH;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test JCEKS file load/save on S3a.
|
||||||
|
* Uses CredentialShell to better replicate the CLI.
|
||||||
|
*
|
||||||
|
* See HADOOP-17894.
|
||||||
|
* This test is at risk of leaking FS instances in the JCEKS providers;
|
||||||
|
* this is handled in an AfterClass operation.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class ITestJceksIO extends AbstractS3ATestBase {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
ITestJceksIO.class);
|
||||||
|
private static final String UTF8 = StandardCharsets.UTF_8.name();
|
||||||
|
private PrintStream oldStdout, oldStderr;
|
||||||
|
private ByteArrayOutputStream stdout, stderr;
|
||||||
|
private PrintStream printStdout, printStderr;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
super.setup();
|
||||||
|
oldStdout = System.out;
|
||||||
|
oldStderr = System.err;
|
||||||
|
stdout = new ByteArrayOutputStream();
|
||||||
|
printStdout = new PrintStream(stdout);
|
||||||
|
System.setOut(printStdout);
|
||||||
|
|
||||||
|
stderr = new ByteArrayOutputStream();
|
||||||
|
printStderr = new PrintStream(stderr);
|
||||||
|
System.setErr(printStderr);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
System.setOut(oldStdout);
|
||||||
|
System.setErr(oldStderr);
|
||||||
|
IOUtils.cleanupWithLogger(LOG, printStdout, printStderr);
|
||||||
|
super.teardown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shut down all filesystems for this user to avoid
|
||||||
|
* leaking those used by credential providers.
|
||||||
|
*/
|
||||||
|
@AfterClass
|
||||||
|
public static void closeAllFilesystems() {
|
||||||
|
try {
|
||||||
|
LOG.info("Closing down all filesystems for current user");
|
||||||
|
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("UGI.getCurrentUser()", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FS config with no providers. FS caching is disabled.
|
||||||
|
* @return a new configuration.
|
||||||
|
*/
|
||||||
|
private Configuration createNewConfiguration() {
|
||||||
|
final Configuration conf = new Configuration(getConfiguration());
|
||||||
|
removeBaseAndBucketOverrides(conf,
|
||||||
|
HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH,
|
||||||
|
S3A_SECURITY_CREDENTIAL_PROVIDER_PATH);
|
||||||
|
disableFilesystemCaching(conf);
|
||||||
|
return conf;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* List credentials; expect the file to be missing.
|
||||||
|
* hadoop credential list -provider jceks://s3a@bucket/s3.jceks
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testListMissingJceksFile() throws Throwable {
|
||||||
|
final Path dir = path("jceks");
|
||||||
|
Path keystore = new Path(dir, "keystore.jceks");
|
||||||
|
String jceksProvider = toJceksProvider(keystore);
|
||||||
|
|
||||||
|
CredentialShell cs = new CredentialShell();
|
||||||
|
|
||||||
|
cs.setConf(createNewConfiguration());
|
||||||
|
run(cs, null,
|
||||||
|
"list", "-provider", jceksProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCredentialSuccessfulLifecycle() throws Exception {
|
||||||
|
final Path dir = path("jceks");
|
||||||
|
Path keystore = new Path(dir, "keystore.jceks");
|
||||||
|
String jceksProvider = toJceksProvider(keystore);
|
||||||
|
CredentialShell cs = new CredentialShell();
|
||||||
|
cs.setConf(createNewConfiguration());
|
||||||
|
run(cs, "credential1 has been successfully created.", "create", "credential1", "-value",
|
||||||
|
"p@ssw0rd", "-provider",
|
||||||
|
jceksProvider);
|
||||||
|
|
||||||
|
assertIsFile(keystore);
|
||||||
|
run(cs, "credential1",
|
||||||
|
"list", "-provider", jceksProvider);
|
||||||
|
|
||||||
|
run(cs, "credential1 has been successfully deleted.",
|
||||||
|
"delete", "credential1", "-f", "-provider",
|
||||||
|
jceksProvider);
|
||||||
|
|
||||||
|
String[] args5 = {
|
||||||
|
"list", "-provider",
|
||||||
|
jceksProvider
|
||||||
|
};
|
||||||
|
String out = run(cs, null, args5);
|
||||||
|
Assertions.assertThat(out)
|
||||||
|
.describedAs("Command result of list")
|
||||||
|
.doesNotContain("credential1");
|
||||||
|
}
|
||||||
|
|
||||||
|
private String run(CredentialShell cs, String expected, String... args)
|
||||||
|
throws Exception {
|
||||||
|
stdout.reset();
|
||||||
|
int rc = cs.run(args);
|
||||||
|
final String out = stdout.toString(UTF8);
|
||||||
|
LOG.error("{}", stderr.toString(UTF8));
|
||||||
|
LOG.info("{}", out);
|
||||||
|
|
||||||
|
Assertions.assertThat(rc)
|
||||||
|
.describedAs("Command result of %s with output %s",
|
||||||
|
args[0], out)
|
||||||
|
.isEqualTo(0);
|
||||||
|
if (expected != null) {
|
||||||
|
Assertions.assertThat(out)
|
||||||
|
.describedAs("Command result of %s", args[0])
|
||||||
|
.contains(expected);
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a path to a jceks URI.
|
||||||
|
* @param keystore store
|
||||||
|
* @return string for the command line
|
||||||
|
*/
|
||||||
|
private String toJceksProvider(Path keystore) {
|
||||||
|
final URI uri = keystore.toUri();
|
||||||
|
return String.format("jceks://%s@%s%s",
|
||||||
|
uri.getScheme(), uri.getHost(), uri.getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue