Revert "HADOOP-17873. ABFS: Fix transient failures in ITestAbfsStreamStatistics and ITestAbfsRestOperationException (#3341)"
This reverts commit 82658a22d6
.
This commit is contained in:
parent
e1ac10ceae
commit
45f164a854
|
@ -555,7 +555,6 @@
|
||||||
<exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
|
<exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
|
||||||
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
|
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
|
||||||
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
|
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
|
||||||
<exclude>**/azurebfs/ITestAbfsStreamStatistics*.java</exclude>
|
|
||||||
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
|
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
|
||||||
</excludes>
|
</excludes>
|
||||||
|
|
||||||
|
@ -598,7 +597,6 @@
|
||||||
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
|
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
|
||||||
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
|
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
|
||||||
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
|
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
|
||||||
<include>**/azurebfs/ITestAbfsStreamStatistics*.java</include>
|
|
||||||
</includes>
|
</includes>
|
||||||
</configuration>
|
</configuration>
|
||||||
</execution>
|
</execution>
|
||||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.hadoop.fs.azurebfs.oauth2;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -139,9 +138,4 @@ public final class CustomTokenProviderAdapter extends AccessTokenProvider
|
||||||
String suffix = ExtensionHelper.getUserAgentSuffix(adaptee, "");
|
String suffix = ExtensionHelper.getUserAgentSuffix(adaptee, "");
|
||||||
return suffix != null ? suffix : "";
|
return suffix != null ? suffix : "";
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
protected CustomTokenProviderAdaptee getCustomTokenProviderAdaptee() {
|
|
||||||
return adaptee;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1223,9 +1223,4 @@ public class AbfsClient implements Closeable {
|
||||||
public <V> void addCallback(ListenableFuture<V> future, FutureCallback<V> callback) {
|
public <V> void addCallback(ListenableFuture<V> future, FutureCallback<V> callback) {
|
||||||
Futures.addCallback(future, callback, executorService);
|
Futures.addCallback(future, callback, executorService);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
protected AccessTokenProvider getTokenProvider() {
|
|
||||||
return tokenProvider;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,12 +37,10 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||||
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
|
||||||
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
|
||||||
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
|
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
|
||||||
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
|
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
|
||||||
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
||||||
|
@ -243,9 +241,6 @@ public abstract class AbstractAbfsIntegrationTest extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public AccessTokenProvider getAccessTokenProvider(final AzureBlobFileSystem fs) {
|
|
||||||
return TestAbfsClient.getAccessTokenProvider(fs.getAbfsStore().getClient());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void loadConfiguredFileSystem() throws Exception {
|
public void loadConfiguredFileSystem() throws Exception {
|
||||||
// disable auto-creation of filesystem
|
// disable auto-creation of filesystem
|
||||||
|
|
|
@ -111,10 +111,7 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest
|
||||||
final AzureBlobFileSystem fs1 =
|
final AzureBlobFileSystem fs1 =
|
||||||
(AzureBlobFileSystem) FileSystem.newInstance(fs.getUri(),
|
(AzureBlobFileSystem) FileSystem.newInstance(fs.getUri(),
|
||||||
config);
|
config);
|
||||||
RetryTestTokenProvider retryTestTokenProvider
|
RetryTestTokenProvider.ResetStatusToFirstTokenFetch();
|
||||||
= RetryTestTokenProvider.getCurrentRetryTestProviderInstance(
|
|
||||||
getAccessTokenProvider(fs1));
|
|
||||||
retryTestTokenProvider.resetStatusToFirstTokenFetch();
|
|
||||||
|
|
||||||
intercept(Exception.class,
|
intercept(Exception.class,
|
||||||
()-> {
|
()-> {
|
||||||
|
@ -122,10 +119,10 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest
|
||||||
});
|
});
|
||||||
|
|
||||||
// Number of retries done should be as configured
|
// Number of retries done should be as configured
|
||||||
Assert.assertEquals(
|
Assert.assertTrue(
|
||||||
"Number of token fetch retries done does not match with fs.azure"
|
"Number of token fetch retries (" + RetryTestTokenProvider.reTryCount
|
||||||
+ ".custom.token.fetch.retry.count configured", numOfRetries,
|
+ ") done, does not match with fs.azure.custom.token.fetch.retry.count configured (" + numOfRetries
|
||||||
retryTestTokenProvider.getRetryCount());
|
+ ")", RetryTestTokenProvider.reTryCount == numOfRetries);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -30,12 +30,12 @@ import org.slf4j.LoggerFactory;
|
||||||
*/
|
*/
|
||||||
public class RetryTestTokenProvider implements CustomTokenProviderAdaptee {
|
public class RetryTestTokenProvider implements CustomTokenProviderAdaptee {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
|
||||||
RetryTestTokenProvider.class);
|
|
||||||
|
|
||||||
// Need to track first token fetch otherwise will get counted as a retry too.
|
// Need to track first token fetch otherwise will get counted as a retry too.
|
||||||
private boolean isThisFirstTokenFetch = true;
|
private static boolean isThisFirstTokenFetch = true;
|
||||||
private int retryCount = 0;
|
public static int reTryCount = 0;
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(RetryTestTokenProvider.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(Configuration configuration, String accountName)
|
public void initialize(Configuration configuration, String accountName)
|
||||||
|
@ -43,13 +43,9 @@ public class RetryTestTokenProvider implements CustomTokenProviderAdaptee {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public static void ResetStatusToFirstTokenFetch() {
|
||||||
* Clear earlier retry details and reset RetryTestTokenProvider instance to
|
|
||||||
* state of first access token fetch call.
|
|
||||||
*/
|
|
||||||
public void resetStatusToFirstTokenFetch() {
|
|
||||||
isThisFirstTokenFetch = true;
|
isThisFirstTokenFetch = true;
|
||||||
retryCount = 0;
|
reTryCount = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -57,7 +53,7 @@ public class RetryTestTokenProvider implements CustomTokenProviderAdaptee {
|
||||||
if (isThisFirstTokenFetch) {
|
if (isThisFirstTokenFetch) {
|
||||||
isThisFirstTokenFetch = false;
|
isThisFirstTokenFetch = false;
|
||||||
} else {
|
} else {
|
||||||
retryCount++;
|
reTryCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("RetryTestTokenProvider: Throw an exception in fetching tokens");
|
LOG.debug("RetryTestTokenProvider: Throw an exception in fetching tokens");
|
||||||
|
@ -68,13 +64,4 @@ public class RetryTestTokenProvider implements CustomTokenProviderAdaptee {
|
||||||
public Date getExpiryTime() {
|
public Date getExpiryTime() {
|
||||||
return new Date();
|
return new Date();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static RetryTestTokenProvider getCurrentRetryTestProviderInstance(
|
|
||||||
AccessTokenProvider customTokenProvider) {
|
|
||||||
return (RetryTestTokenProvider) ((CustomTokenProviderAdapter) customTokenProvider).getCustomTokenProviderAdaptee();
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getRetryCount() {
|
|
||||||
return retryCount;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -395,8 +395,4 @@ public final class TestAbfsClient {
|
||||||
url,
|
url,
|
||||||
requestHeaders);
|
requestHeaders);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) {
|
|
||||||
return client.getTokenProvider();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue