HADOOP-16660. ABFS: Make RetryCount in ExponentialRetryPolicy Configurable.
Contributed by Sneha Vijayarajan.
This commit is contained in:
parent
c225efe237
commit
aa9cd0a2d6
|
@ -1118,7 +1118,9 @@ public class AzureBlobFileSystemStore {
|
|||
tokenProvider = abfsConfiguration.getTokenProvider();
|
||||
}
|
||||
|
||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider, abfsPerfTracker);
|
||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
||||
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
||||
tokenProvider, abfsPerfTracker);
|
||||
}
|
||||
|
||||
private String getOctalNotation(FsPermission fsPermission) {
|
||||
|
@ -1342,4 +1344,4 @@ public class AzureBlobFileSystemStore {
|
|||
AbfsClient getClient() {
|
||||
return this.client;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -130,8 +130,11 @@ public class AbfsRestOperation {
|
|||
}
|
||||
|
||||
int retryCount = 0;
|
||||
LOG.debug("First execution of REST operation - {}", operationType);
|
||||
while (!executeHttpOperation(retryCount++)) {
|
||||
try {
|
||||
LOG.debug("Retrying REST operation {}. RetryCount = {}",
|
||||
operationType, retryCount);
|
||||
Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
|
|
|
@ -25,11 +25,6 @@ import java.net.HttpURLConnection;
|
|||
* Retry policy used by AbfsClient.
|
||||
* */
|
||||
public class ExponentialRetryPolicy {
|
||||
/**
|
||||
* Represents the default number of retry attempts.
|
||||
*/
|
||||
private static final int DEFAULT_CLIENT_RETRY_COUNT = 30;
|
||||
|
||||
/**
|
||||
* Represents the default amount of time used when calculating a random delta in the exponential
|
||||
* delay between retries.
|
||||
|
@ -86,8 +81,10 @@ public class ExponentialRetryPolicy {
|
|||
/**
|
||||
* Initializes a new instance of the {@link ExponentialRetryPolicy} class.
|
||||
*/
|
||||
public ExponentialRetryPolicy() {
|
||||
this(DEFAULT_CLIENT_RETRY_COUNT, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_CLIENT_BACKOFF);
|
||||
public ExponentialRetryPolicy(final int maxIoRetries) {
|
||||
|
||||
this(maxIoRetries, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF,
|
||||
DEFAULT_CLIENT_BACKOFF);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,12 +20,17 @@ package org.apache.hadoop.fs.azurebfs;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.RetryTestTokenProvider;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
/**
|
||||
* Verify the AbfsRestOperationException error message format.
|
||||
* */
|
||||
|
@ -72,4 +77,40 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest
|
|||
&& errorFields[5].contains("Time"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestRetryConfig() throws Exception {
|
||||
testRetryLogic(0);
|
||||
testRetryLogic(3);
|
||||
}
|
||||
|
||||
public void testRetryLogic(int numOfRetries) throws Exception {
|
||||
AzureBlobFileSystem fs = this.getFileSystem();
|
||||
|
||||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
String accountName = config.get("fs.azure.abfs.account.name");
|
||||
// Setup to configure custom token provider
|
||||
config.set("fs.azure.account.auth.type." + accountName, "Custom");
|
||||
config.set("fs.azure.account.oauth.provider.type." + accountName, "org.apache.hadoop.fs"
|
||||
+ ".azurebfs.oauth2.RetryTestTokenProvider");
|
||||
config.set("fs.azure.io.retry.max.retries", Integer.toString(numOfRetries));
|
||||
// Stop filesystem creation as it will lead to calls to store.
|
||||
config.set("fs.azure.createRemoteFileSystemDuringInitialization", "false");
|
||||
|
||||
final AzureBlobFileSystem fs1 =
|
||||
(AzureBlobFileSystem) FileSystem.newInstance(fs.getUri(),
|
||||
config);
|
||||
RetryTestTokenProvider.ResetStatusToFirstTokenFetch();
|
||||
|
||||
intercept(Exception.class,
|
||||
()-> {
|
||||
fs1.getFileStatus(new Path("/"));
|
||||
});
|
||||
|
||||
// Number of retries done should be as configured
|
||||
Assert.assertTrue(
|
||||
"Number of token fetch retries (" + RetryTestTokenProvider.reTryCount
|
||||
+ ") done, does not match with max " + "retry count configured (" + numOfRetries
|
||||
+ ")", RetryTestTokenProvider.reTryCount == numOfRetries);
|
||||
}
|
||||
}
|
|
@ -182,4 +182,4 @@ public class TestAbfsConfigurationFieldsValidation {
|
|||
assertEquals(SSLSocketFactoryEx.SSLChannelMode.OpenSSL, localAbfsConfiguration.getPreferredSSLFactoryOption());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.azurebfs.oauth2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test Token provider which should throw exception and trigger retries
|
||||
*/
|
||||
public class RetryTestTokenProvider implements CustomTokenProviderAdaptee {
|
||||
|
||||
// Need to track first token fetch otherwise will get counted as a retry too.
|
||||
private static boolean isThisFirstTokenFetch = true;
|
||||
public static int reTryCount = 0;
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(RetryTestTokenProvider.class);
|
||||
|
||||
@Override
|
||||
public void initialize(Configuration configuration, String accountName)
|
||||
throws IOException {
|
||||
|
||||
}
|
||||
|
||||
public static void ResetStatusToFirstTokenFetch() {
|
||||
isThisFirstTokenFetch = true;
|
||||
reTryCount = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAccessToken() throws IOException {
|
||||
if (isThisFirstTokenFetch) {
|
||||
isThisFirstTokenFetch = false;
|
||||
} else {
|
||||
reTryCount++;
|
||||
}
|
||||
|
||||
LOG.debug("RetryTestTokenProvider: Throw an exception in fetching tokens");
|
||||
throw new IOException("test exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Date getExpiryTime() {
|
||||
return new Date();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue