HADOOP-16778. ABFS: Backport HADOOP-16660 ABFS: Make RetryCount in ExponentialRetryPolicy Configurable to Branch-2.
Contributed by Sneha Vijayarajan.
This commit is contained in:
parent
17a0bf8d78
commit
3bc2e26cd7
@ -938,7 +938,9 @@ private void initializeClient(URI uri, String fileSystemName, String accountName
|
|||||||
tokenProvider = abfsConfiguration.getTokenProvider();
|
tokenProvider = abfsConfiguration.getTokenProvider();
|
||||||
}
|
}
|
||||||
|
|
||||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider);
|
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
||||||
|
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
||||||
|
tokenProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getOctalNotation(FsPermission fsPermission) {
|
private String getOctalNotation(FsPermission fsPermission) {
|
||||||
|
@ -122,8 +122,11 @@ public AbfsHttpOperation getResult() {
|
|||||||
*/
|
*/
|
||||||
void execute() throws AzureBlobFileSystemException {
|
void execute() throws AzureBlobFileSystemException {
|
||||||
int retryCount = 0;
|
int retryCount = 0;
|
||||||
|
LOG.debug("First execution of REST operation - {}", operationType);
|
||||||
while (!executeHttpOperation(retryCount++)) {
|
while (!executeHttpOperation(retryCount++)) {
|
||||||
try {
|
try {
|
||||||
|
LOG.debug("Retrying REST operation {}. RetryCount = {}",
|
||||||
|
operationType, retryCount);
|
||||||
Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
|
Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
@ -25,11 +25,6 @@
|
|||||||
* Retry policy used by AbfsClient.
|
* Retry policy used by AbfsClient.
|
||||||
* */
|
* */
|
||||||
public class ExponentialRetryPolicy {
|
public class ExponentialRetryPolicy {
|
||||||
/**
|
|
||||||
* Represents the default number of retry attempts.
|
|
||||||
*/
|
|
||||||
private static final int DEFAULT_CLIENT_RETRY_COUNT = 30;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents the default amount of time used when calculating a random delta in the exponential
|
* Represents the default amount of time used when calculating a random delta in the exponential
|
||||||
* delay between retries.
|
* delay between retries.
|
||||||
@ -86,8 +81,10 @@ public class ExponentialRetryPolicy {
|
|||||||
/**
|
/**
|
||||||
* Initializes a new instance of the {@link ExponentialRetryPolicy} class.
|
* Initializes a new instance of the {@link ExponentialRetryPolicy} class.
|
||||||
*/
|
*/
|
||||||
public ExponentialRetryPolicy() {
|
public ExponentialRetryPolicy(final int maxIoRetries) {
|
||||||
this(DEFAULT_CLIENT_RETRY_COUNT, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_CLIENT_BACKOFF);
|
|
||||||
|
this(maxIoRetries, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF,
|
||||||
|
DEFAULT_CLIENT_BACKOFF);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -20,12 +20,17 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.oauth2.RetryTestTokenProvider;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify the AbfsRestOperationException error message format.
|
* Verify the AbfsRestOperationException error message format.
|
||||||
* */
|
* */
|
||||||
@ -72,4 +77,40 @@ public void testAbfsRestOperationExceptionFormat() throws IOException {
|
|||||||
&& errorFields[5].contains("Time"));
|
&& errorFields[5].contains("Time"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestRetryConfig() throws Exception {
|
||||||
|
testRetryLogic(0);
|
||||||
|
testRetryLogic(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRetryLogic(int numOfRetries) throws Exception {
|
||||||
|
AzureBlobFileSystem fs = this.getFileSystem();
|
||||||
|
|
||||||
|
Configuration config = new Configuration(this.getRawConfiguration());
|
||||||
|
String accountName = config.get("fs.azure.abfs.account.name");
|
||||||
|
// Setup to configure custom token provider
|
||||||
|
config.set("fs.azure.account.auth.type." + accountName, "Custom");
|
||||||
|
config.set("fs.azure.account.oauth.provider.type." + accountName, "org.apache.hadoop.fs"
|
||||||
|
+ ".azurebfs.oauth2.RetryTestTokenProvider");
|
||||||
|
config.set("fs.azure.io.retry.max.retries", Integer.toString(numOfRetries));
|
||||||
|
// Stop filesystem creation as it will lead to calls to store.
|
||||||
|
config.set("fs.azure.createRemoteFileSystemDuringInitialization", "false");
|
||||||
|
|
||||||
|
final AzureBlobFileSystem fs1 =
|
||||||
|
(AzureBlobFileSystem) FileSystem.newInstance(fs.getUri(),
|
||||||
|
config);
|
||||||
|
RetryTestTokenProvider.ResetStatusToFirstTokenFetch();
|
||||||
|
try {
|
||||||
|
fs1.getFileStatus(new Path("/"));
|
||||||
|
} catch (Exception ex) {
|
||||||
|
// Expected to fail as
|
||||||
|
}
|
||||||
|
|
||||||
|
// Number of retries done should be as configured
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Number of token fetch retries (" + RetryTestTokenProvider.reTryCount
|
||||||
|
+ ") done, does not match with max " + "retry count configured (" + numOfRetries
|
||||||
|
+ ")", RetryTestTokenProvider.reTryCount == numOfRetries);
|
||||||
|
}
|
||||||
}
|
}
|
@ -0,0 +1,67 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.azurebfs.oauth2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Token provider which should throw exception and trigger retries
|
||||||
|
*/
|
||||||
|
public class RetryTestTokenProvider implements CustomTokenProviderAdaptee {
|
||||||
|
|
||||||
|
// Need to track first token fetch otherwise will get counted as a retry too.
|
||||||
|
private static boolean isThisFirstTokenFetch = true;
|
||||||
|
public static int reTryCount = 0;
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(RetryTestTokenProvider.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(Configuration configuration, String accountName)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void ResetStatusToFirstTokenFetch() {
|
||||||
|
isThisFirstTokenFetch = true;
|
||||||
|
reTryCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getAccessToken() throws IOException {
|
||||||
|
if (isThisFirstTokenFetch) {
|
||||||
|
isThisFirstTokenFetch = false;
|
||||||
|
} else {
|
||||||
|
reTryCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug("RetryTestTokenProvider: Throw an exception in fetching tokens");
|
||||||
|
throw new IOException("test exception");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Date getExpiryTime() {
|
||||||
|
return new Date();
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user