diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 084b5585327..78d6260b4f7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -110,6 +110,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS) private int maxIoRetries; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT, + MinValue = 0, + DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT) + private int customTokenFetchRetryCount; + @LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME, MinValue = 0, MaxValue = MAX_AZURE_BLOCK_SIZE, @@ -425,6 +430,10 @@ public class AbfsConfiguration{ return this.maxIoRetries; } + public int getCustomTokenFetchRetryCount() { + return this.customTokenFetchRetryCount; + } + public long getAzureBlockSize() { return this.azureBlockSize; } @@ -597,7 +606,7 @@ public class AbfsConfiguration{ LOG.trace("Initializing {}", customTokenProviderClass.getName()); azureTokenProvider.initialize(rawConfig, accountName); LOG.trace("{} init complete", customTokenProviderClass.getName()); - return new CustomTokenProviderAdapter(azureTokenProvider); + return new CustomTokenProviderAdapter(azureTokenProvider, getCustomTokenFetchRetryCount()); } catch(IllegalArgumentException e) { throw e; } catch (Exception e) { @@ -732,6 +741,11 @@ public class AbfsConfiguration{ this.listMaxResults = listMaxResults; } + @VisibleForTesting + public void setMaxIoRetries(int maxIoRetries) { + this.maxIoRetries = maxIoRetries; + } + private String getTrimmedPasswordString(String key, String defaultValue) throws IOException { String value = getPasswordString(key); if (StringUtils.isBlank(value)) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 3d8b9874508..5db111b4c45 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -36,6 +36,7 @@ public final class ConfigurationKeys { public static final String AZURE_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval"; public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval"; public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries"; + public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count"; // Read and write buffer sizes defined by the user public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 5c27d84f449..3add0efa011 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -37,6 +37,7 @@ public final class FileSystemConfigurations { public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30; + public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3; private static final int ONE_KB = 1024; private static final int ONE_MB = ONE_KB * ONE_KB; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java index 93c40a71206..f836bab7667 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java @@ -230,12 +230,23 @@ public final class AzureADAuthenticator { final StringBuilder sb = new StringBuilder(); sb.append("HTTP Error "); sb.append(httpErrorCode); - sb.append("; url='").append(url).append('\''); - sb.append(' '); + if (!url.isEmpty()) { + sb.append("; url='").append(url).append('\'').append(' '); + } + sb.append(super.getMessage()); - sb.append("; requestId='").append(requestId).append('\''); - sb.append("; contentType='").append(contentType).append('\''); - sb.append("; response '").append(body).append('\''); + if (!requestId.isEmpty()) { + sb.append("; requestId='").append(requestId).append('\''); + } + + if (!contentType.isEmpty()) { + sb.append("; contentType='").append(contentType).append('\''); + } + + if (!body.isEmpty()) { + sb.append("; response '").append(body).append('\''); + } + return sb.toString(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java index 37cfa6f1d29..1bd7da920bb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension; import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee; import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper; +import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.HttpException; /** * Provides tokens based on custom implementation, following the Adapter Design @@ -38,6 +39,7 @@ import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper; public final class CustomTokenProviderAdapter extends AccessTokenProvider implements BoundDTExtension { + private final int fetchTokenRetryCount; private CustomTokenProviderAdaptee adaptee; private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); @@ -45,17 +47,57 @@ public final class CustomTokenProviderAdapter extends AccessTokenProvider * Constructs a token provider based on the custom token provider. * * @param adaptee the custom token provider + * @param customTokenFetchRetryCount max retry count for customTokenFetch */ - public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee) { + public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee, int customTokenFetchRetryCount) { Preconditions.checkNotNull(adaptee, "adaptee"); this.adaptee = adaptee; + fetchTokenRetryCount = customTokenFetchRetryCount; } protected AzureADToken refreshToken() throws IOException { LOG.debug("AADToken: refreshing custom based token"); AzureADToken azureADToken = new AzureADToken(); - azureADToken.setAccessToken(adaptee.getAccessToken()); + + String accessToken = null; + + Exception ex; + boolean succeeded = false; + // Custom token providers should have their own retry policies, + // Providing a linear retry option for the the retry count + // mentioned in config "fs.azure.custom.token.fetch.retry.count" + int retryCount = fetchTokenRetryCount; + do { + ex = null; + try { + accessToken = adaptee.getAccessToken(); + LOG.trace("CustomTokenProvider Access token fetch was successful with retry count {}", + (fetchTokenRetryCount - retryCount)); + } catch (Exception e) { + LOG.debug("CustomTokenProvider Access token fetch failed with retry count {}", + (fetchTokenRetryCount - retryCount)); + ex = e; + } + + succeeded = (ex == null); + retryCount--; + } while (!succeeded && (retryCount) >= 0); + + if (!succeeded) { + HttpException httpEx = new HttpException( + -1, + "", + String.format("CustomTokenProvider getAccessToken threw %s : %s", + ex.getClass().getTypeName(), ex.getMessage()), + "", + "", + "" + ); + throw httpEx; + } + + azureADToken.setAccessToken(accessToken); azureADToken.setExpiry(adaptee.getExpiryTime()); return azureADToken; diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 01c1fbd03b3..e1a2fca9397 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -539,6 +539,8 @@ token when its `getAccessToken()` method is invoked. The declared class must implement `org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee` and optionally `org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension`. +The declared class also holds responsibility to implement retry logic while fetching access tokens. + ## Technical notes ### Proxy setup diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java index 6f6982652e4..1d86de7ebeb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsRestOperationException.java @@ -79,12 +79,13 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest } @Test - public void testRequestRetryConfig() throws Exception { - testRetryLogic(0); - testRetryLogic(3); + public void testCustomTokenFetchRetryCount() throws Exception { + testWithDifferentCustomTokenFetchRetry(0); + testWithDifferentCustomTokenFetchRetry(3); + testWithDifferentCustomTokenFetchRetry(5); } - public void testRetryLogic(int numOfRetries) throws Exception { + public void testWithDifferentCustomTokenFetchRetry(int numOfRetries) throws Exception { AzureBlobFileSystem fs = this.getFileSystem(); Configuration config = new Configuration(this.getRawConfiguration()); @@ -93,7 +94,7 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest config.set("fs.azure.account.auth.type." + accountName, "Custom"); config.set("fs.azure.account.oauth.provider.type." + accountName, "org.apache.hadoop.fs" + ".azurebfs.oauth2.RetryTestTokenProvider"); - config.set("fs.azure.io.retry.max.retries", Integer.toString(numOfRetries)); + config.set("fs.azure.custom.token.fetch.retry.count", Integer.toString(numOfRetries)); // Stop filesystem creation as it will lead to calls to store. config.set("fs.azure.createRemoteFileSystemDuringInitialization", "false"); @@ -110,7 +111,7 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest // Number of retries done should be as configured Assert.assertTrue( "Number of token fetch retries (" + RetryTestTokenProvider.reTryCount - + ") done, does not match with max " + "retry count configured (" + numOfRetries + + ") done, does not match with fs.azure.custom.token.fetch.retry.count configured (" + numOfRetries + ")", RetryTestTokenProvider.reTryCount == numOfRetries); } } \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java new file mode 100644 index 00000000000..e10419f148b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; + +/** + * Unit test TestExponentialRetryPolicy. + */ +public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest { + + private final int maxRetryCount = 30; + private final int noRetryCount = 0; + private final int retryCount = new Random().nextInt(maxRetryCount); + private final int retryCountBeyondMax = maxRetryCount + 1; + + + public TestExponentialRetryPolicy() throws Exception { + super(); + } + + @Test + public void testDifferentMaxIORetryCount() throws Exception { + AbfsConfiguration abfsConfig = getAbfsConfig(); + abfsConfig.setMaxIoRetries(noRetryCount); + testMaxIOConfig(abfsConfig); + abfsConfig.setMaxIoRetries(retryCount); + testMaxIOConfig(abfsConfig); + abfsConfig.setMaxIoRetries(retryCountBeyondMax); + testMaxIOConfig(abfsConfig); + } + + @Test + public void testDefaultMaxIORetryCount() throws Exception { + AbfsConfiguration abfsConfig = getAbfsConfig(); + Assert.assertTrue( + String.format("default maxIORetry count is %s.", maxRetryCount), + abfsConfig.getMaxIoRetries() == maxRetryCount); + testMaxIOConfig(abfsConfig); + } + + private AbfsConfiguration getAbfsConfig() throws Exception { + Configuration + config = new Configuration(this.getRawConfiguration()); + return new AbfsConfiguration(config, "dummyAccountName"); + } + + private void testMaxIOConfig(AbfsConfiguration abfsConfig) { + ExponentialRetryPolicy retryPolicy = new ExponentialRetryPolicy( + abfsConfig.getMaxIoRetries()); + int localRetryCount = 0; + + while (localRetryCount < abfsConfig.getMaxIoRetries()) { + Assert.assertTrue( + "Retry should be allowed when retryCount less than max count configured.", + retryPolicy.shouldRetry(localRetryCount, -1)); + localRetryCount++; + } + + Assert.assertTrue( + "When all retries are exhausted, the retryCount will be same as max configured", + localRetryCount == abfsConfig.getMaxIoRetries()); + } +}