diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/alias/CredentialProviderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/alias/CredentialProviderFactory.java index 1b2ac41fa84..8b39337ed18 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/alias/CredentialProviderFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/alias/CredentialProviderFactory.java @@ -25,11 +25,13 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.ServiceLoader; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.PathIOException; /** * A factory to create a list of CredentialProvider based on the path given in a @@ -59,9 +61,18 @@ public abstract class CredentialProviderFactory { } } + /** + * Fail fast on any recursive load of credential providers, which can + * happen if the FS itself triggers the load. + * A simple boolean could be used here, as the synchronized block ensures + * that only one thread can be active at a time. An atomic is used + * for rigorousness. + */ + private static final AtomicBoolean SERVICE_LOADER_LOCKED = new AtomicBoolean(false); + public static List getProviders(Configuration conf ) throws IOException { - List result = new ArrayList(); + List result = new ArrayList<>(); for(String path: conf.getStringCollection(CREDENTIAL_PROVIDER_PATH)) { try { URI uri = new URI(path); @@ -69,13 +80,23 @@ public abstract class CredentialProviderFactory { // Iterate serviceLoader in a synchronized block since // serviceLoader iterator is not thread-safe. synchronized (serviceLoader) { - for (CredentialProviderFactory factory : serviceLoader) { - CredentialProvider kp = factory.createProvider(uri, conf); - if (kp != null) { - result.add(kp); - found = true; - break; + try { + if (SERVICE_LOADER_LOCKED.getAndSet(true)) { + throw new PathIOException(path, + "Recursive load of credential provider; " + + "if loading a JCEKS file, this means that the filesystem connector is " + + "trying to load the same file"); } + for (CredentialProviderFactory factory : serviceLoader) { + CredentialProvider kp = factory.createProvider(uri, conf); + if (kp != null) { + result.add(kp); + found = true; + break; + } + } + } finally { + SERVICE_LOADER_LOCKED.set(false); } } if (!found) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 06fd6040a5e..e0dfb56e507 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -186,6 +186,7 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.fs.store.EtagChecksum; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; @@ -393,6 +394,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, LOG.debug("Initializing S3AFileSystem for {}", bucket); // clone the configuration into one with propagated bucket options Configuration conf = propagateBucketOptions(originalConf, bucket); + + // HADOOP-17894. remove references to s3a stores in JCEKS credentials. + conf = ProviderUtils.excludeIncompatibleCredentialProviders( + conf, S3AFileSystem.class); + // fix up the classloader of the configuration to be whatever // classloader loaded this filesystem. // See: HADOOP-17372 diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index fe5b141cea4..8a181f2a753 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -1153,7 +1153,7 @@ public final class S3AUtils { public static Configuration propagateBucketOptions(Configuration source, String bucket) { - Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "bucket"); + Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "bucket is null/empty"); final String bucketPrefix = FS_S3A_BUCKET_PREFIX + bucket +'.'; LOG.debug("Propagating entries under {}", bucketPrefix); final Configuration dest = new Configuration(source); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestJceksIO.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestJceksIO.java new file mode 100644 index 00000000000..3649a6731a0 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestJceksIO.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.auth; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URI; +import java.nio.charset.StandardCharsets; + +import org.assertj.core.api.Assertions; +import org.junit.AfterClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.alias.CredentialShell; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH; +import static org.apache.hadoop.fs.s3a.Constants.S3A_SECURITY_CREDENTIAL_PROVIDER_PATH; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; + +/** + * Test JCEKS file load/save on S3a. + * Uses CredentialShell to better replicate the CLI. + * + * See HADOOP-17894. + * This test is at risk of leaking FS instances in the JCEKS providers; + * this is handled in an AfterClass operation. + * + */ +public class ITestJceksIO extends AbstractS3ATestBase { + private static final Logger LOG = LoggerFactory.getLogger( + ITestJceksIO.class); + private static final String UTF8 = StandardCharsets.UTF_8.name(); + private PrintStream oldStdout, oldStderr; + private ByteArrayOutputStream stdout, stderr; + private PrintStream printStdout, printStderr; + + @Override + public void setup() throws Exception { + super.setup(); + oldStdout = System.out; + oldStderr = System.err; + stdout = new ByteArrayOutputStream(); + printStdout = new PrintStream(stdout); + System.setOut(printStdout); + + stderr = new ByteArrayOutputStream(); + printStderr = new PrintStream(stderr); + System.setErr(printStderr); + } + + @Override + public void teardown() throws Exception { + System.setOut(oldStdout); + System.setErr(oldStderr); + IOUtils.cleanupWithLogger(LOG, printStdout, printStderr); + super.teardown(); + } + + /** + * Shut down all filesystems for this user to avoid + * leaking those used by credential providers. + */ + @AfterClass + public static void closeAllFilesystems() { + try { + LOG.info("Closing down all filesystems for current user"); + FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); + } catch (IOException e) { + LOG.warn("UGI.getCurrentUser()", e); + } + } + + /** + * FS config with no providers. FS caching is disabled. + * @return a new configuration. + */ + private Configuration createNewConfiguration() { + final Configuration conf = new Configuration(getConfiguration()); + removeBaseAndBucketOverrides(conf, + HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH, + S3A_SECURITY_CREDENTIAL_PROVIDER_PATH); + disableFilesystemCaching(conf); + return conf; + + } + + /* + * List credentials; expect the file to be missing. + * hadoop credential list -provider jceks://s3a@bucket/s3.jceks + */ + @Test + public void testListMissingJceksFile() throws Throwable { + final Path dir = path("jceks"); + Path keystore = new Path(dir, "keystore.jceks"); + String jceksProvider = toJceksProvider(keystore); + + CredentialShell cs = new CredentialShell(); + + cs.setConf(createNewConfiguration()); + run(cs, null, + "list", "-provider", jceksProvider); + } + + @Test + public void testCredentialSuccessfulLifecycle() throws Exception { + final Path dir = path("jceks"); + Path keystore = new Path(dir, "keystore.jceks"); + String jceksProvider = toJceksProvider(keystore); + CredentialShell cs = new CredentialShell(); + cs.setConf(createNewConfiguration()); + run(cs, "credential1 has been successfully created.", "create", "credential1", "-value", + "p@ssw0rd", "-provider", + jceksProvider); + + assertIsFile(keystore); + run(cs, "credential1", + "list", "-provider", jceksProvider); + + run(cs, "credential1 has been successfully deleted.", + "delete", "credential1", "-f", "-provider", + jceksProvider); + + String[] args5 = { + "list", "-provider", + jceksProvider + }; + String out = run(cs, null, args5); + Assertions.assertThat(out) + .describedAs("Command result of list") + .doesNotContain("credential1"); + } + + private String run(CredentialShell cs, String expected, String... args) + throws Exception { + stdout.reset(); + int rc = cs.run(args); + final String out = stdout.toString(UTF8); + LOG.error("{}", stderr.toString(UTF8)); + LOG.info("{}", out); + + Assertions.assertThat(rc) + .describedAs("Command result of %s with output %s", + args[0], out) + .isEqualTo(0); + if (expected != null) { + Assertions.assertThat(out) + .describedAs("Command result of %s", args[0]) + .contains(expected); + } + return out; + } + + /** + * Convert a path to a jceks URI. + * @param keystore store + * @return string for the command line + */ + private String toJceksProvider(Path keystore) { + final URI uri = keystore.toUri(); + return String.format("jceks://%s@%s%s", + uri.getScheme(), uri.getHost(), uri.getPath()); + } + +}