HADOOP-18233. Possible race condition with TemporaryAWSCredentialsProvider (#5024)

This fixes a race condition with the TemporaryAWSCredentialProvider
one which has existed for a long time but which only surfaced
(usually in Spark) when the bucket existence probe was disabled
by setting fs.s3a.bucket.probe to 0 -a performance speedup
which was made the default in HADOOP-17454.

Contributed by Jimmy Wong.
This commit is contained in:
sabertiger 2022-10-31 05:43:30 -07:00 committed by GitHub
parent cbe02c2e77
commit af7dd660e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 171 additions and 10 deletions

View File

@ -45,7 +45,7 @@ public abstract class AbstractSessionCredentialsProvider
extends AbstractAWSCredentialProvider { extends AbstractAWSCredentialProvider {
/** Credentials, created in {@link #init()}. */ /** Credentials, created in {@link #init()}. */
private AWSCredentials awsCredentials; private volatile AWSCredentials awsCredentials;
/** Atomic flag for on-demand initialization. */ /** Atomic flag for on-demand initialization. */
private final AtomicBoolean initialized = new AtomicBoolean(false); private final AtomicBoolean initialized = new AtomicBoolean(false);
@ -54,7 +54,7 @@ public abstract class AbstractSessionCredentialsProvider
* The (possibly translated) initialization exception. * The (possibly translated) initialization exception.
* Used for testing. * Used for testing.
*/ */
private IOException initializationException; private volatile IOException initializationException;
/** /**
* Constructor. * Constructor.
@ -73,9 +73,9 @@ public abstract class AbstractSessionCredentialsProvider
* @throws IOException on any failure. * @throws IOException on any failure.
*/ */
@Retries.OnceTranslated @Retries.OnceTranslated
protected void init() throws IOException { protected synchronized void init() throws IOException {
// stop re-entrant attempts // stop re-entrant attempts
if (initialized.getAndSet(true)) { if (isInitialized()) {
return; return;
} }
try { try {
@ -84,6 +84,8 @@ public abstract class AbstractSessionCredentialsProvider
} catch (IOException e) { } catch (IOException e) {
initializationException = e; initializationException = e;
throw e; throw e;
} finally {
initialized.set(true);
} }
} }
@ -132,13 +134,15 @@ public abstract class AbstractSessionCredentialsProvider
} }
if (awsCredentials == null) { if (awsCredentials == null) {
throw new CredentialInitializationException( throw new CredentialInitializationException(
"Provider " + this + " has no credentials"); "Provider " + this + " has no credentials: " +
(initializationException != null ? initializationException.toString() : ""),
initializationException);
} }
return awsCredentials; return awsCredentials;
} }
public final boolean hasCredentials() { public final boolean hasCredentials() {
return awsCredentials == null; return awsCredentials != null;
} }
@Override @Override

View File

@ -22,9 +22,15 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.URI; import java.net.URI;
import java.nio.file.AccessDeniedException; import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
@ -37,6 +43,7 @@ import org.junit.rules.ExpectedException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.auth.AbstractSessionCredentialsProvider;
import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider; import org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider;
import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException; import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
@ -46,6 +53,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
import static org.junit.Assert.*; import static org.junit.Assert.*;
/** /**
@ -198,7 +206,7 @@ public class TestS3AAWSCredentialsProvider {
/** /**
* A credential provider whose constructor signature doesn't match. * A credential provider whose constructor signature doesn't match.
*/ */
static class ConstructorSignatureErrorProvider protected static class ConstructorSignatureErrorProvider
implements AWSCredentialsProvider { implements AWSCredentialsProvider {
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -218,7 +226,7 @@ public class TestS3AAWSCredentialsProvider {
/** /**
* A credential provider whose constructor raises an NPE. * A credential provider whose constructor raises an NPE.
*/ */
static class ConstructorFailureProvider protected static class ConstructorFailureProvider
implements AWSCredentialsProvider { implements AWSCredentialsProvider {
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -246,7 +254,7 @@ public class TestS3AAWSCredentialsProvider {
} }
} }
static class AWSExceptionRaisingFactory implements AWSCredentialsProvider { protected static class AWSExceptionRaisingFactory implements AWSCredentialsProvider {
public static final String NO_AUTH = "No auth"; public static final String NO_AUTH = "No auth";
@ -462,7 +470,7 @@ public class TestS3AAWSCredentialsProvider {
/** /**
* Credential provider which raises an IOE when constructed. * Credential provider which raises an IOE when constructed.
*/ */
private static class IOERaisingProvider implements AWSCredentialsProvider { protected static class IOERaisingProvider implements AWSCredentialsProvider {
public IOERaisingProvider(URI uri, Configuration conf) public IOERaisingProvider(URI uri, Configuration conf)
throws IOException { throws IOException {
@ -480,4 +488,153 @@ public class TestS3AAWSCredentialsProvider {
} }
} }
private static final AWSCredentials EXPECTED_CREDENTIALS = new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "expectedAccessKey";
}
@Override
public String getAWSSecretKey() {
return "expectedSecret";
}
};
/**
* Credential provider that takes a long time.
*/
protected static class SlowProvider extends AbstractSessionCredentialsProvider {
public SlowProvider(@Nullable URI uri, Configuration conf) {
super(uri, conf);
}
@Override
protected AWSCredentials createCredentials(Configuration config) throws IOException {
// yield to other callers to induce race condition
Thread.yield();
return EXPECTED_CREDENTIALS;
}
}
private static final int CONCURRENT_THREADS = 10;
@Test
public void testConcurrentAuthentication() throws Throwable {
Configuration conf = createProviderConfiguration(SlowProvider.class.getName());
Path testFile = getCSVTestPath(conf);
AWSCredentialProviderList list = createAWSCredentialProviderSet(testFile.toUri(), conf);
SlowProvider provider = (SlowProvider) list.getProviders().get(0);
ExecutorService pool = Executors.newFixedThreadPool(CONCURRENT_THREADS);
List<Future<AWSCredentials>> results = new ArrayList<>();
try {
assertFalse(
"Provider not initialized. isInitialized should be false",
provider.isInitialized());
assertFalse(
"Provider not initialized. hasCredentials should be false",
provider.hasCredentials());
if (provider.getInitializationException() != null) {
throw new AssertionError(
"Provider not initialized. getInitializationException should return null",
provider.getInitializationException());
}
for (int i = 0; i < CONCURRENT_THREADS; i++) {
results.add(pool.submit(() -> list.getCredentials()));
}
for (Future<AWSCredentials> result : results) {
AWSCredentials credentials = result.get();
assertEquals("Access key from credential provider",
"expectedAccessKey", credentials.getAWSAccessKeyId());
assertEquals("Secret key from credential provider",
"expectedSecret", credentials.getAWSSecretKey());
}
} finally {
pool.awaitTermination(10, TimeUnit.SECONDS);
pool.shutdown();
}
assertTrue(
"Provider initialized without errors. isInitialized should be true",
provider.isInitialized());
assertTrue(
"Provider initialized without errors. hasCredentials should be true",
provider.hasCredentials());
if (provider.getInitializationException() != null) {
throw new AssertionError(
"Provider initialized without errors. getInitializationException should return null",
provider.getInitializationException());
}
}
/**
* Credential provider with error.
*/
protected static class ErrorProvider extends AbstractSessionCredentialsProvider {
public ErrorProvider(@Nullable URI uri, Configuration conf) {
super(uri, conf);
}
@Override
protected AWSCredentials createCredentials(Configuration config) throws IOException {
throw new IOException("expected error");
}
}
@Test
public void testConcurrentAuthenticationError() throws Throwable {
Configuration conf = createProviderConfiguration(ErrorProvider.class.getName());
Path testFile = getCSVTestPath(conf);
AWSCredentialProviderList list = createAWSCredentialProviderSet(testFile.toUri(), conf);
ErrorProvider provider = (ErrorProvider) list.getProviders().get(0);
ExecutorService pool = Executors.newFixedThreadPool(CONCURRENT_THREADS);
List<Future<AWSCredentials>> results = new ArrayList<>();
try {
assertFalse("Provider not initialized. isInitialized should be false",
provider.isInitialized());
assertFalse("Provider not initialized. hasCredentials should be false",
provider.hasCredentials());
if (provider.getInitializationException() != null) {
throw new AssertionError(
"Provider not initialized. getInitializationException should return null",
provider.getInitializationException());
}
for (int i = 0; i < CONCURRENT_THREADS; i++) {
results.add(pool.submit(() -> list.getCredentials()));
}
for (Future<AWSCredentials> result : results) {
interceptFuture(CredentialInitializationException.class,
"expected error",
result
);
}
} finally {
pool.awaitTermination(10, TimeUnit.SECONDS);
pool.shutdown();
}
assertTrue(
"Provider initialization failed. isInitialized should be true",
provider.isInitialized());
assertFalse(
"Provider initialization failed. hasCredentials should be false",
provider.hasCredentials());
assertTrue(
"Provider initialization failed. getInitializationException should contain the error",
provider.getInitializationException().getMessage().contains("expected error"));
}
} }