Hadoop 16857. ABFS: Stop CustomTokenProvider retry logic to depend on AbfsRestOp retry policy
Contributed by Sneha Vijayarajan
This commit is contained in:
parent
1340518cd8
commit
32fb174da2
|
@ -110,6 +110,11 @@ public class AbfsConfiguration{
|
|||
DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS)
|
||||
private int maxIoRetries;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT,
|
||||
MinValue = 0,
|
||||
DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
|
||||
private int customTokenFetchRetryCount;
|
||||
|
||||
@LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME,
|
||||
MinValue = 0,
|
||||
MaxValue = MAX_AZURE_BLOCK_SIZE,
|
||||
|
@ -425,6 +430,10 @@ public class AbfsConfiguration{
|
|||
return this.maxIoRetries;
|
||||
}
|
||||
|
||||
public int getCustomTokenFetchRetryCount() {
|
||||
return this.customTokenFetchRetryCount;
|
||||
}
|
||||
|
||||
public long getAzureBlockSize() {
|
||||
return this.azureBlockSize;
|
||||
}
|
||||
|
@ -597,7 +606,7 @@ public class AbfsConfiguration{
|
|||
LOG.trace("Initializing {}", customTokenProviderClass.getName());
|
||||
azureTokenProvider.initialize(rawConfig, accountName);
|
||||
LOG.trace("{} init complete", customTokenProviderClass.getName());
|
||||
return new CustomTokenProviderAdapter(azureTokenProvider);
|
||||
return new CustomTokenProviderAdapter(azureTokenProvider, getCustomTokenFetchRetryCount());
|
||||
} catch(IllegalArgumentException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
|
@ -732,6 +741,11 @@ public class AbfsConfiguration{
|
|||
this.listMaxResults = listMaxResults;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setMaxIoRetries(int maxIoRetries) {
|
||||
this.maxIoRetries = maxIoRetries;
|
||||
}
|
||||
|
||||
private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
|
||||
String value = getPasswordString(key);
|
||||
if (StringUtils.isBlank(value)) {
|
||||
|
|
|
@ -36,6 +36,7 @@ public final class ConfigurationKeys {
|
|||
public static final String AZURE_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval";
|
||||
public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
|
||||
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
|
||||
public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";
|
||||
|
||||
// Read and write buffer sizes defined by the user
|
||||
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
|
||||
|
|
|
@ -37,6 +37,7 @@ public final class FileSystemConfigurations {
|
|||
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
|
||||
public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s
|
||||
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
|
||||
public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;
|
||||
|
||||
private static final int ONE_KB = 1024;
|
||||
private static final int ONE_MB = ONE_KB * ONE_KB;
|
||||
|
|
|
@ -230,12 +230,23 @@ public final class AzureADAuthenticator {
|
|||
final StringBuilder sb = new StringBuilder();
|
||||
sb.append("HTTP Error ");
|
||||
sb.append(httpErrorCode);
|
||||
sb.append("; url='").append(url).append('\'');
|
||||
sb.append(' ');
|
||||
if (!url.isEmpty()) {
|
||||
sb.append("; url='").append(url).append('\'').append(' ');
|
||||
}
|
||||
|
||||
sb.append(super.getMessage());
|
||||
sb.append("; requestId='").append(requestId).append('\'');
|
||||
sb.append("; contentType='").append(contentType).append('\'');
|
||||
sb.append("; response '").append(body).append('\'');
|
||||
if (!requestId.isEmpty()) {
|
||||
sb.append("; requestId='").append(requestId).append('\'');
|
||||
}
|
||||
|
||||
if (!contentType.isEmpty()) {
|
||||
sb.append("; contentType='").append(contentType).append('\'');
|
||||
}
|
||||
|
||||
if (!body.isEmpty()) {
|
||||
sb.append("; response '").append(body).append('\'');
|
||||
}
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension;
|
||||
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
|
||||
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.HttpException;
|
||||
|
||||
/**
|
||||
* Provides tokens based on custom implementation, following the Adapter Design
|
||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
|
|||
public final class CustomTokenProviderAdapter extends AccessTokenProvider
|
||||
implements BoundDTExtension {
|
||||
|
||||
private final int fetchTokenRetryCount;
|
||||
private CustomTokenProviderAdaptee adaptee;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
|
||||
|
||||
|
@ -45,17 +47,57 @@ public final class CustomTokenProviderAdapter extends AccessTokenProvider
|
|||
* Constructs a token provider based on the custom token provider.
|
||||
*
|
||||
* @param adaptee the custom token provider
|
||||
* @param customTokenFetchRetryCount max retry count for customTokenFetch
|
||||
*/
|
||||
public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee) {
|
||||
public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee, int customTokenFetchRetryCount) {
|
||||
Preconditions.checkNotNull(adaptee, "adaptee");
|
||||
this.adaptee = adaptee;
|
||||
fetchTokenRetryCount = customTokenFetchRetryCount;
|
||||
}
|
||||
|
||||
protected AzureADToken refreshToken() throws IOException {
|
||||
LOG.debug("AADToken: refreshing custom based token");
|
||||
|
||||
AzureADToken azureADToken = new AzureADToken();
|
||||
azureADToken.setAccessToken(adaptee.getAccessToken());
|
||||
|
||||
String accessToken = null;
|
||||
|
||||
Exception ex;
|
||||
boolean succeeded = false;
|
||||
// Custom token providers should have their own retry policies,
|
||||
// Providing a linear retry option for the the retry count
|
||||
// mentioned in config "fs.azure.custom.token.fetch.retry.count"
|
||||
int retryCount = fetchTokenRetryCount;
|
||||
do {
|
||||
ex = null;
|
||||
try {
|
||||
accessToken = adaptee.getAccessToken();
|
||||
LOG.trace("CustomTokenProvider Access token fetch was successful with retry count {}",
|
||||
(fetchTokenRetryCount - retryCount));
|
||||
} catch (Exception e) {
|
||||
LOG.debug("CustomTokenProvider Access token fetch failed with retry count {}",
|
||||
(fetchTokenRetryCount - retryCount));
|
||||
ex = e;
|
||||
}
|
||||
|
||||
succeeded = (ex == null);
|
||||
retryCount--;
|
||||
} while (!succeeded && (retryCount) >= 0);
|
||||
|
||||
if (!succeeded) {
|
||||
HttpException httpEx = new HttpException(
|
||||
-1,
|
||||
"",
|
||||
String.format("CustomTokenProvider getAccessToken threw %s : %s",
|
||||
ex.getClass().getTypeName(), ex.getMessage()),
|
||||
"",
|
||||
"",
|
||||
""
|
||||
);
|
||||
throw httpEx;
|
||||
}
|
||||
|
||||
azureADToken.setAccessToken(accessToken);
|
||||
azureADToken.setExpiry(adaptee.getExpiryTime());
|
||||
|
||||
return azureADToken;
|
||||
|
|
|
@ -539,6 +539,8 @@ token when its `getAccessToken()` method is invoked.
|
|||
The declared class must implement `org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee`
|
||||
and optionally `org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension`.
|
||||
|
||||
The declared class also holds responsibility to implement retry logic while fetching access tokens.
|
||||
|
||||
## <a name="technical"></a> Technical notes
|
||||
|
||||
### <a name="proxy"></a> Proxy setup
|
||||
|
|
|
@ -79,12 +79,13 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testRequestRetryConfig() throws Exception {
|
||||
testRetryLogic(0);
|
||||
testRetryLogic(3);
|
||||
public void testCustomTokenFetchRetryCount() throws Exception {
|
||||
testWithDifferentCustomTokenFetchRetry(0);
|
||||
testWithDifferentCustomTokenFetchRetry(3);
|
||||
testWithDifferentCustomTokenFetchRetry(5);
|
||||
}
|
||||
|
||||
public void testRetryLogic(int numOfRetries) throws Exception {
|
||||
public void testWithDifferentCustomTokenFetchRetry(int numOfRetries) throws Exception {
|
||||
AzureBlobFileSystem fs = this.getFileSystem();
|
||||
|
||||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
|
@ -93,7 +94,7 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest
|
|||
config.set("fs.azure.account.auth.type." + accountName, "Custom");
|
||||
config.set("fs.azure.account.oauth.provider.type." + accountName, "org.apache.hadoop.fs"
|
||||
+ ".azurebfs.oauth2.RetryTestTokenProvider");
|
||||
config.set("fs.azure.io.retry.max.retries", Integer.toString(numOfRetries));
|
||||
config.set("fs.azure.custom.token.fetch.retry.count", Integer.toString(numOfRetries));
|
||||
// Stop filesystem creation as it will lead to calls to store.
|
||||
config.set("fs.azure.createRemoteFileSystemDuringInitialization", "false");
|
||||
|
||||
|
@ -110,7 +111,7 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest
|
|||
// Number of retries done should be as configured
|
||||
Assert.assertTrue(
|
||||
"Number of token fetch retries (" + RetryTestTokenProvider.reTryCount
|
||||
+ ") done, does not match with max " + "retry count configured (" + numOfRetries
|
||||
+ ") done, does not match with fs.azure.custom.token.fetch.retry.count configured (" + numOfRetries
|
||||
+ ")", RetryTestTokenProvider.reTryCount == numOfRetries);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
|
||||
/**
|
||||
* Unit test TestExponentialRetryPolicy.
|
||||
*/
|
||||
public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
|
||||
|
||||
private final int maxRetryCount = 30;
|
||||
private final int noRetryCount = 0;
|
||||
private final int retryCount = new Random().nextInt(maxRetryCount);
|
||||
private final int retryCountBeyondMax = maxRetryCount + 1;
|
||||
|
||||
|
||||
public TestExponentialRetryPolicy() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDifferentMaxIORetryCount() throws Exception {
|
||||
AbfsConfiguration abfsConfig = getAbfsConfig();
|
||||
abfsConfig.setMaxIoRetries(noRetryCount);
|
||||
testMaxIOConfig(abfsConfig);
|
||||
abfsConfig.setMaxIoRetries(retryCount);
|
||||
testMaxIOConfig(abfsConfig);
|
||||
abfsConfig.setMaxIoRetries(retryCountBeyondMax);
|
||||
testMaxIOConfig(abfsConfig);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultMaxIORetryCount() throws Exception {
|
||||
AbfsConfiguration abfsConfig = getAbfsConfig();
|
||||
Assert.assertTrue(
|
||||
String.format("default maxIORetry count is %s.", maxRetryCount),
|
||||
abfsConfig.getMaxIoRetries() == maxRetryCount);
|
||||
testMaxIOConfig(abfsConfig);
|
||||
}
|
||||
|
||||
private AbfsConfiguration getAbfsConfig() throws Exception {
|
||||
Configuration
|
||||
config = new Configuration(this.getRawConfiguration());
|
||||
return new AbfsConfiguration(config, "dummyAccountName");
|
||||
}
|
||||
|
||||
private void testMaxIOConfig(AbfsConfiguration abfsConfig) {
|
||||
ExponentialRetryPolicy retryPolicy = new ExponentialRetryPolicy(
|
||||
abfsConfig.getMaxIoRetries());
|
||||
int localRetryCount = 0;
|
||||
|
||||
while (localRetryCount < abfsConfig.getMaxIoRetries()) {
|
||||
Assert.assertTrue(
|
||||
"Retry should be allowed when retryCount less than max count configured.",
|
||||
retryPolicy.shouldRetry(localRetryCount, -1));
|
||||
localRetryCount++;
|
||||
}
|
||||
|
||||
Assert.assertTrue(
|
||||
"When all retries are exhausted, the retryCount will be same as max configured",
|
||||
localRetryCount == abfsConfig.getMaxIoRetries());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue