HADOOP-18012. ABFS: Enable config controlled ETag check for Rename idempotency (#5488)

To support recovery of network failures during rename, the abfs client
fetches the etag of the source file, and when recovering from a
failure, uses this tag to determine whether the rename succeeded
before the failure happened.

* This works for files, but not directories
* It adds the overhead of a HEAD request before each rename.
* The option can be disabled by setting "fs.azure.enable.rename.resilience"
  to false

Contributed by Sree Bhattacharyya
This commit is contained in:
sreeb-msft 2023-03-31 23:45:15 +05:30 committed by Steve Loughran
parent 42ed2b9075
commit f324efd247
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
11 changed files with 621 additions and 96 deletions

View File

@ -333,6 +333,10 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_ABFS_LIST_ITERATOR, DefaultValue = DEFAULT_ENABLE_ABFS_LIST_ITERATOR)
private boolean enableAbfsListIterator;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE)
private boolean renameResilience;
public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
@ -1139,4 +1143,11 @@ public class AbfsConfiguration{
this.enableAbfsListIterator = enableAbfsListIterator;
}
public boolean getRenameResilience() {
return renameResilience;
}
void setRenameResilience(boolean actualResilience) {
renameResilience = actualResilience;
}
}

View File

@ -201,9 +201,9 @@ public class AzureBlobFileSystem extends FileSystem
tracingHeaderFormat = abfsConfiguration.getTracingHeaderFormat();
this.setWorkingDirectory(this.getHomeDirectory());
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) {
try {
this.createFileSystem(tracingContext);
@ -442,7 +442,7 @@ public class AzureBlobFileSystem extends FileSystem
}
// Non-HNS account need to check dst status on driver side.
if (!abfsStore.getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) {
if (!getIsNamespaceEnabled(tracingContext) && dstFileStatus == null) {
dstFileStatus = tryGetFileStatus(qualifiedDstPath, tracingContext);
}

View File

@ -923,9 +923,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
do {
try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
final AbfsClientRenameResult abfsClientRenameResult =
client.renamePath(sourceRelativePath, destinationRelativePath,
continuation, tracingContext, sourceEtag, false);
continuation, tracingContext, sourceEtag, false,
isNamespaceEnabled);
AbfsRestOperation op = abfsClientRenameResult.getOp();
perfInfo.registerResult(op.getResult());

View File

@ -238,6 +238,9 @@ public final class ConfigurationKeys {
/** Key for rate limit capacity, as used by IO operations which try to throttle themselves. */
public static final String FS_AZURE_ABFS_IO_RATE_LIMIT = "fs.azure.io.rate.limit";
/** Add extra resilience to rename failures, at the expense of performance. */
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";
public static String accountProperty(String property, String account) {
return property + "." + account;
}

View File

@ -118,6 +118,7 @@ public final class FileSystemConfigurations {
public static final int STREAM_ID_LEN = 12;
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
/**
* Limit of queued block upload operations before writes

View File

@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
@ -68,6 +69,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
@ -77,8 +79,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.S
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
/**
* AbfsClient.
@ -106,9 +108,12 @@ public class AbfsClient implements Closeable {
private final ListeningScheduledExecutorService executorService;
/** logging the rename failure if metadata is in an incomplete state. */
private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE =
new LogExactlyOnce(LOG);
private boolean renameResilience;
/**
* logging the rename failure if metadata is in an incomplete state.
*/
private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = new LogExactlyOnce(LOG);
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
@ -123,6 +128,7 @@ public class AbfsClient implements Closeable {
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
this.renameResilience = abfsConfiguration.getRenameResilience();
String encryptionKey = this.abfsConfiguration
.getClientProvidedEncryptionKey();
@ -504,27 +510,55 @@ public class AbfsClient implements Closeable {
* took place.
* As rename recovery is only attempted if the source etag is non-empty,
* in normal rename operations rename recovery will never happen.
* @param source path to source file
* @param destination destination of rename.
* @param continuation continuation.
* @param tracingContext trace context
* @param sourceEtag etag of source file. may be null or empty
*
* @param source path to source file
* @param destination destination of rename.
* @param continuation continuation.
* @param tracingContext trace context
* @param sourceEtag etag of source file. may be null or empty
* @param isMetadataIncompleteState was there a rename failure due to
* incomplete metadata state?
* @param isNamespaceEnabled whether namespace enabled account or not
* @return AbfsClientRenameResult result of rename operation indicating the
* AbfsRest operation, rename recovery and incomplete metadata state failure.
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
*/
public AbfsClientRenameResult renamePath(
final String source,
final String destination,
final String continuation,
final TracingContext tracingContext,
final String sourceEtag,
boolean isMetadataIncompleteState)
final String source,
final String destination,
final String continuation,
final TracingContext tracingContext,
String sourceEtag,
boolean isMetadataIncompleteState,
boolean isNamespaceEnabled)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final boolean hasEtag = !isEmpty(sourceEtag);
boolean shouldAttemptRecovery = renameResilience && isNamespaceEnabled;
if (!hasEtag && shouldAttemptRecovery) {
// in case eTag is already not supplied to the API
// and rename resilience is expected and it is an HNS enabled account
// fetch the source etag to be used later in recovery
try {
final AbfsRestOperation srcStatusOp = getPathStatus(source,
false, tracingContext);
if (srcStatusOp.hasResult()) {
final AbfsHttpOperation result = srcStatusOp.getResult();
sourceEtag = extractEtagHeader(result);
// and update the directory status.
boolean isDir = checkIsDir(result);
shouldAttemptRecovery = !isDir;
LOG.debug("Retrieved etag of source for rename recovery: {}; isDir={}", sourceEtag, isDir);
}
} catch (AbfsRestOperationException e) {
throw new AbfsRestOperationException(e.getStatusCode(), SOURCE_PATH_NOT_FOUND.getErrorCode(),
e.getMessage(), e);
}
}
String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source);
if (authType == AuthType.SAS) {
final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
@ -541,12 +575,7 @@ public class AbfsClient implements Closeable {
appendSASTokenToQuery(destination, SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.RenamePath,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders);
try {
incrementAbfsRenamePath();
op.execute(tracingContext);
@ -557,48 +586,74 @@ public class AbfsClient implements Closeable {
// isMetadataIncompleteState is used for renameRecovery(as the 2nd param).
return new AbfsClientRenameResult(op, isMetadataIncompleteState, isMetadataIncompleteState);
} catch (AzureBlobFileSystemException e) {
// If we have no HTTP response, throw the original exception.
if (!op.hasResult()) {
throw e;
}
// ref: HADOOP-18242. Rename failure occurring due to a rare case of
// tracking metadata being in incomplete state.
if (op.getResult().getStorageErrorCode()
.equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())
&& !isMetadataIncompleteState) {
// Logging once
ABFS_METADATA_INCOMPLETE_RENAME_FAILURE
.info("Rename Failure attempting to resolve tracking metadata state and retrying.");
// If we have no HTTP response, throw the original exception.
if (!op.hasResult()) {
throw e;
}
// ref: HADOOP-18242. Rename failure occurring due to a rare case of
// tracking metadata being in incomplete state.
if (op.getResult().getStorageErrorCode()
.equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())
&& !isMetadataIncompleteState) {
//Logging
ABFS_METADATA_INCOMPLETE_RENAME_FAILURE
.info("Rename Failure attempting to resolve tracking metadata state and retrying.");
// rename recovery should be attempted in this case also
shouldAttemptRecovery = true;
isMetadataIncompleteState = true;
String sourceEtagAfterFailure = sourceEtag;
if (isEmpty(sourceEtagAfterFailure)) {
// Doing a HEAD call resolves the incomplete metadata state and
// then we can retry the rename operation.
AbfsRestOperation sourceStatusOp = getPathStatus(source, false,
tracingContext);
isMetadataIncompleteState = true;
// Extract the sourceEtag, using the status Op, and set it
// for future rename recovery.
AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult();
String sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
renamePath(source, destination, continuation, tracingContext,
sourceEtagAfterFailure, isMetadataIncompleteState);
sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
}
// if we get out of the condition without a successful rename, then
// it isn't metadata incomplete state issue.
isMetadataIncompleteState = false;
renamePath(source, destination, continuation, tracingContext,
sourceEtagAfterFailure, isMetadataIncompleteState, isNamespaceEnabled);
}
// if we get out of the condition without a successful rename, then
// it isn't metadata incomplete state issue.
isMetadataIncompleteState = false;
boolean etagCheckSucceeded = renameIdempotencyCheckOp(
source,
sourceEtag, op, destination, tracingContext);
if (!etagCheckSucceeded) {
// idempotency did not return different result
// throw back the exception
throw e;
}
// setting default rename recovery success to false
boolean etagCheckSucceeded = false;
if (shouldAttemptRecovery) {
etagCheckSucceeded = renameIdempotencyCheckOp(
source,
sourceEtag, op, destination, tracingContext);
}
if (!etagCheckSucceeded) {
// idempotency did not return different result
// throw back the exception
throw e;
}
return new AbfsClientRenameResult(op, true, isMetadataIncompleteState);
}
}
private boolean checkIsDir(AbfsHttpOperation result) {
String resourceType = result.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
return resourceType != null
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
}
@VisibleForTesting
AbfsRestOperation createRenameRestOperation(URL url, List<AbfsHttpHeader> requestHeaders) {
AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.RenamePath,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
return op;
}
private void incrementAbfsRenamePath() {
abfsCounters.incrementCounter(RENAME_PATH_ATTEMPTS, 1);
}
@ -628,28 +683,44 @@ public class AbfsClient implements Closeable {
TracingContext tracingContext) {
Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
&& isNotEmpty(sourceEtag)) {
// Server has returned HTTP 404, which means rename source no longer
// exists. Check on destination status and if its etag matches
// that of the source, consider it to be a success.
LOG.debug("rename {} to {} failed, checking etag of destination",
source, destination);
// removing isDir from debug logs as it can be misleading
LOG.debug("rename({}, {}) failure {}; retry={} etag {}",
source, destination, op.getResult().getStatusCode(), op.isARetriedRequest(), sourceEtag);
if (!(op.isARetriedRequest()
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND))) {
// only attempt recovery if the failure was a 404 on a retried rename request.
return false;
}
if (isNotEmpty(sourceEtag)) {
// Server has returned HTTP 404, we have an etag, so see
// if the rename has actually taken place,
LOG.info("rename {} to {} failed, checking etag of destination",
source, destination);
try {
final AbfsRestOperation destStatusOp = getPathStatus(destination,
false, tracingContext);
final AbfsRestOperation destStatusOp = getPathStatus(destination, false, tracingContext);
final AbfsHttpOperation result = destStatusOp.getResult();
return result.getStatusCode() == HttpURLConnection.HTTP_OK
&& sourceEtag.equals(extractEtagHeader(result));
} catch (AzureBlobFileSystemException ignored) {
final boolean recovered = result.getStatusCode() == HttpURLConnection.HTTP_OK
&& sourceEtag.equals(extractEtagHeader(result));
LOG.info("File rename has taken place: recovery {}",
recovered ? "succeeded" : "failed");
return recovered;
} catch (AzureBlobFileSystemException ex) {
// GetFileStatus on the destination failed, the rename did not take place
// or some other failure. log and swallow.
LOG.debug("Failed to get status of path {}", destination, ex);
}
} else {
LOG.debug("No source etag; unable to probe for the operation's success");
}
return false;
return false;
}
@VisibleForTesting
boolean isSourceDestEtagEqual(String sourceEtag, AbfsHttpOperation result) {
return sourceEtag.equals(extractEtagHeader(result));
}
public AbfsRestOperation append(final String path, final byte[] buffer,

View File

@ -58,4 +58,16 @@ public class AbfsClientRenameResult {
public boolean isIncompleteMetadataState() {
return isIncompleteMetadataState;
}
@Override
public String toString() {
return "AbfsClientRenameResult{"
+ "op="
+ op
+ ", renameRecovered="
+ renameRecovered
+ ", isIncompleteMetadataState="
+ isIncompleteMetadataState
+ '}';
}
}

View File

@ -277,26 +277,8 @@ public class AbfsRestOperation {
incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
tracingContext.constructHeader(httpOperation, failureReason);
switch(client.getAuthType()) {
case Custom:
case OAuth:
LOG.debug("Authenticating request with OAuth2 access token");
httpOperation.setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
client.getAccessToken());
break;
case SAS:
// do nothing; the SAS token should already be appended to the query string
httpOperation.setMaskForSAS(); //mask sig/oid from url for logs
break;
case SharedKey:
// sign the HTTP request
LOG.debug("Signing request with shared key");
// sign the HTTP request
client.getSharedKeyCredentials().signRequest(
httpOperation.getConnection(),
hasRequestBody ? bufferLength : 0);
break;
}
signRequest(httpOperation, hasRequestBody ? bufferLength : 0);
} catch (IOException e) {
LOG.debug("Auth failure: {}, {}", method, url);
throw new AbfsRestOperationException(-1, null,
@ -377,6 +359,37 @@ public class AbfsRestOperation {
return true;
}
/**
* Sign an operation.
* @param httpOperation operation to sign
* @param bytesToSign how many bytes to sign for shared key auth.
* @throws IOException failure
*/
@VisibleForTesting
public void signRequest(final AbfsHttpOperation httpOperation, int bytesToSign) throws IOException {
switch(client.getAuthType()) {
case Custom:
case OAuth:
LOG.debug("Authenticating request with OAuth2 access token");
httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
client.getAccessToken());
break;
case SAS:
// do nothing; the SAS token should already be appended to the query string
httpOperation.setMaskForSAS(); //mask sig/oid from url for logs
break;
case SharedKey:
default:
// sign the HTTP request
LOG.debug("Signing request with shared key");
// sign the HTTP request
client.getSharedKeyCredentials().signRequest(
httpOperation.getConnection(),
bytesToSign);
break;
}
}
/**
* Creates new object of {@link AbfsHttpOperation} with the url, method, and
* requestHeaders fields of the AbfsRestOperation object.

View File

@ -70,6 +70,8 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
private static final Logger LOG =
LoggerFactory.getLogger(ITestAzureBlobFileSystemDelegationSAS.class);
private boolean isHNSEnabled;
public ITestAzureBlobFileSystemDelegationSAS() throws Exception {
// These tests rely on specific settings in azure-auth-keys.xml:
String sasProvider = getRawConfiguration().get(FS_AZURE_SAS_TOKEN_PROVIDER_TYPE);
@ -85,7 +87,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
@Override
public void setup() throws Exception {
boolean isHNSEnabled = this.getConfiguration().getBoolean(
isHNSEnabled = this.getConfiguration().getBoolean(
TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
Assume.assumeTrue(isHNSEnabled);
createFilesystemForSASTests();
@ -401,7 +403,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
fs.create(new Path(src)).close();
AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
.renamePath(src, "/testABC" + "/abc.txt", null,
getTestTracingContext(fs, false), null, false)
getTestTracingContext(fs, false), null, false, isHNSEnabled)
.getOp();
AbfsHttpOperation result = abfsHttpRestOperation.getResult();
String url = result.getMaskedUrl();
@ -419,7 +421,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
intercept(IOException.class, "sig=XXXX",
() -> getFileSystem().getAbfsClient()
.renamePath("testABC/test.xt", "testABC/abc.txt", null,
getTestTracingContext(getFileSystem(), false), null, false));
getTestTracingContext(getFileSystem(), false), null, false, isHNSEnabled));
}
@Test

View File

@ -99,10 +99,14 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
private static final int FILE_SIZE = 10 * ONE_MB;
private static final int FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS = 24 * ONE_MB;
private boolean isNamespaceEnabled;
public ITestCustomerProvidedKey() throws Exception {
boolean isCPKTestsEnabled = getConfiguration()
.getBoolean(FS_AZURE_TEST_CPK_ENABLED, false);
Assume.assumeTrue(isCPKTestsEnabled);
isNamespaceEnabled = getConfiguration()
.getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
}
@Test
@ -526,7 +530,7 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
AbfsClient abfsClient = fs.getAbfsClient();
AbfsRestOperation abfsRestOperation = abfsClient
.renamePath(testFileName, newName, null,
getTestTracingContext(fs, false), null, false)
getTestTracingContext(fs, false), null, false, isNamespaceEnabled)
.getOp();
assertCPKHeaders(abfsRestOperation, false);
assertNoCPKResponseHeadersPresent(abfsRestOperation);

View File

@ -18,19 +18,44 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.SocketException;
import java.net.URL;
import java.time.Duration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.EtagSource;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_ALREADY_EXISTS;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -45,7 +70,11 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestAbfsRenameRetryRecovery.class);
private boolean isNamespaceEnabled;
public TestAbfsRenameRetryRecovery() throws Exception {
isNamespaceEnabled = getConfiguration()
.getBoolean(TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
}
/**
@ -90,7 +119,7 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
// We need to throw an exception once a rename is triggered with
// destination having no parent, but after a retry it needs to succeed.
when(mockClient.renamePath(sourcePath, destNoParentPath, null, null,
null, false))
null, false, isNamespaceEnabled))
.thenThrow(destParentNotFound)
.thenReturn(recoveredMetaDataIncompleteResult);
@ -98,12 +127,12 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
intercept(AzureBlobFileSystemException.class,
() -> mockClient.renamePath(sourcePath,
destNoParentPath, null, null,
null, false));
null, false, isNamespaceEnabled));
AbfsClientRenameResult resultOfSecondRenameCall =
mockClient.renamePath(sourcePath,
destNoParentPath, null, null,
null, false);
null, false, isNamespaceEnabled);
// the second rename call should be the recoveredResult due to
// metaDataIncomplete
@ -119,10 +148,387 @@ public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
// Verify renamePath occurred two times implying a retry was attempted.
verify(mockClient, times(2))
.renamePath(sourcePath, destNoParentPath, null, null, null, false);
.renamePath(sourcePath, destNoParentPath, null, null, null, false,
isNamespaceEnabled);
}
AbfsClient getMockAbfsClient() throws IOException {
AzureBlobFileSystem fs = getFileSystem();
// adding mock objects to current AbfsClient
AbfsClient spyClient = Mockito.spy(fs.getAbfsStore().getClient());
Mockito.doAnswer(answer -> {
AbfsRestOperation op = new AbfsRestOperation(AbfsRestOperationType.RenamePath,
spyClient, HTTP_METHOD_PUT, answer.getArgument(0), answer.getArgument(1));
AbfsRestOperation spiedOp = Mockito.spy(op);
addSpyBehavior(spiedOp, op, spyClient);
return spiedOp;
}).when(spyClient).createRenameRestOperation(Mockito.any(URL.class), anyList());
return spyClient;
}
/**
* Spies on a rest operation to inject transient failure.
* the first createHttpOperation() invocation will return an abfs rest operation
* which will fail.
* @param spiedRestOp spied operation whose createHttpOperation() will fail first time
* @param normalRestOp normal operation the good operation
* @param client client.
* @throws IOException failure
*/
private void addSpyBehavior(final AbfsRestOperation spiedRestOp,
final AbfsRestOperation normalRestOp,
final AbfsClient client)
throws IOException {
AbfsHttpOperation failingOperation = Mockito.spy(normalRestOp.createHttpOperation());
AbfsHttpOperation normalOp1 = normalRestOp.createHttpOperation();
executeThenFail(client, normalRestOp, failingOperation, normalOp1);
AbfsHttpOperation normalOp2 = normalRestOp.createHttpOperation();
normalOp2.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
client.getAccessToken());
when(spiedRestOp.createHttpOperation())
.thenReturn(failingOperation)
.thenReturn(normalOp2);
}
/**
* Mock an idempotency failure by executing the normal operation, then
* raising an IOE.
* @param normalRestOp the rest operation used to sign the requests.
* @param failingOperation failing operation
* @param normalOp good operation
* @throws IOException failure
*/
private void executeThenFail(final AbfsClient client,
final AbfsRestOperation normalRestOp,
final AbfsHttpOperation failingOperation,
final AbfsHttpOperation normalOp)
throws IOException {
Mockito.doAnswer(answer -> {
LOG.info("Executing first attempt with post-operation fault injection");
final byte[] buffer = answer.getArgument(0);
final int offset = answer.getArgument(1);
final int length = answer.getArgument(2);
normalRestOp.signRequest(normalOp, length);
normalOp.sendRequest(buffer, offset, length);
normalOp.processResponse(buffer, offset, length);
LOG.info("Actual outcome is {} \"{}\" \"{}\"; injecting failure",
normalOp.getStatusCode(),
normalOp.getStorageErrorCode(),
normalOp.getStorageErrorMessage());
throw new SocketException("connection-reset");
}).when(failingOperation).sendRequest(Mockito.nullable(byte[].class),
Mockito.nullable(int.class), Mockito.nullable(int.class));
}
/**
* This is the good outcome: resilient rename.
*/
@Test
public void testRenameRecoveryEtagMatchFsLevel() throws IOException {
AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
TracingContext testTracingContext = getTestTracingContext(fs, false);
Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext));
AbfsClient mockClient = getMockAbfsClient();
String base = "/" + getMethodName();
String path1 = base + "/dummyFile1";
String path2 = base + "/dummyFile2";
touch(new Path(path1));
setAbfsClient(abfsStore, mockClient);
// checking correct count in AbfsCounters
AbfsCounters counter = mockClient.getAbfsCounters();
IOStatistics ioStats = counter.getIOStatistics();
Long connMadeBeforeRename = lookupCounterStatistic(ioStats, CONNECTIONS_MADE.getStatName());
Long renamePathAttemptsBeforeRename = lookupCounterStatistic(ioStats, RENAME_PATH_ATTEMPTS.getStatName());
// 404 and retry, send sourceEtag as null
// source eTag matches -> rename should pass even when execute throws exception
fs.rename(new Path(path1), new Path(path2));
// validating stat counters after rename
// 4 calls should have happened in total for rename
// 1 -> original rename rest call, 2 -> first retry,
// +2 for getPathStatus calls
assertThatStatisticCounter(ioStats,
CONNECTIONS_MADE.getStatName())
.isEqualTo(4 + connMadeBeforeRename);
// the RENAME_PATH_ATTEMPTS stat should be incremented by 1
// retries happen internally within AbfsRestOperation execute()
// the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called
assertThatStatisticCounter(ioStats,
RENAME_PATH_ATTEMPTS.getStatName())
.isEqualTo(1 + renamePathAttemptsBeforeRename);
}
/**
* execute a failing rename but have the file at the far end not match.
* This is done by explicitly passing in a made up etag for the source
* etag and creating a file at the far end.
* The first rename will actually fail with a path exists exception,
* but as that is swallowed, it's not a problem.
*/
@Test
public void testRenameRecoveryEtagMismatchFsLevel() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
TracingContext testTracingContext = getTestTracingContext(fs, false);
Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext));
AbfsClient mockClient = getMockAbfsClient();
String base = "/" + getMethodName();
String path1 = base + "/dummyFile1";
String path2 = base + "/dummyFile2";
fs.create(new Path(path2));
setAbfsClient(abfsStore, mockClient);
// source eTag does not match -> rename should be a failure
assertEquals(false, fs.rename(new Path(path1), new Path(path2)));
}
@Test
public void testRenameRecoveryFailsForDirFsLevel() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
TracingContext testTracingContext = getTestTracingContext(fs, false);
Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext));
AbfsClient mockClient = getMockAbfsClient();
String dir1 = "/dummyDir1";
String dir2 = "/dummyDir2";
Path path1 = new Path(dir1);
Path path2 = new Path(dir2);
fs.mkdirs(path1);
setAbfsClient(abfsStore, mockClient);
// checking correct count in AbfsCounters
AbfsCounters counter = mockClient.getAbfsCounters();
IOStatistics ioStats = counter.getIOStatistics();
Long connMadeBeforeRename = lookupCounterStatistic(ioStats, CONNECTIONS_MADE.getStatName());
Long renamePathAttemptsBeforeRename = lookupCounterStatistic(ioStats, RENAME_PATH_ATTEMPTS.getStatName());
// source eTag does not match -> rename should be a failure
boolean renameResult = fs.rename(path1, path2);
assertEquals(false, renameResult);
// validating stat counters after rename
// 3 calls should have happened in total for rename
// 1 -> original rename rest call, 2 -> first retry,
// +1 for getPathStatus calls
// last getPathStatus call should be skipped
assertThatStatisticCounter(ioStats,
CONNECTIONS_MADE.getStatName())
.isEqualTo(3 + connMadeBeforeRename);
// the RENAME_PATH_ATTEMPTS stat should be incremented by 1
// retries happen internally within AbfsRestOperation execute()
// the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called
assertThatStatisticCounter(ioStats,
RENAME_PATH_ATTEMPTS.getStatName())
.isEqualTo(1 + renamePathAttemptsBeforeRename);
}
/**
* Assert that an exception failed with a specific error code.
* @param code code
* @param e exception
* @throws AbfsRestOperationException if there is a mismatch
*/
private static void expectErrorCode(final AzureServiceErrorCode code,
final AbfsRestOperationException e) throws AbfsRestOperationException {
if (e.getErrorCode() != code) {
throw e;
}
}
/**
* Directory rename failure is unrecoverable.
*/
@Test
public void testDirRenameRecoveryUnsupported() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
TracingContext testTracingContext = getTestTracingContext(fs, false);
Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext));
AbfsClient spyClient = getMockAbfsClient();
String base = "/" + getMethodName();
String path1 = base + "/dummyDir1";
String path2 = base + "/dummyDir2";
fs.mkdirs(new Path(path1));
// source eTag does not match -> throw exception
expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () ->
spyClient.renamePath(path1, path2, null, testTracingContext, null, false,
isNamespaceEnabled)));
}
/**
* Even with failures, having
*/
@Test
public void testExistingPathCorrectlyRejected() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
TracingContext testTracingContext = getTestTracingContext(fs, false);
Assume.assumeTrue(fs.getAbfsStore().getIsNamespaceEnabled(testTracingContext));
AbfsClient spyClient = getMockAbfsClient();
String base = "/" + getMethodName();
String path1 = base + "/dummyDir1";
String path2 = base + "/dummyDir2";
touch(new Path(path1));
touch(new Path(path2));
// source eTag does not match -> throw exception
expectErrorCode(PATH_ALREADY_EXISTS, intercept(AbfsRestOperationException.class, () ->
spyClient.renamePath(path1, path2, null, testTracingContext, null, false,
isNamespaceEnabled)));
}
/**
* Test that rename recovery remains unsupported for
* FNS configurations.
*/
@Test
public void testRenameRecoveryUnsupportedForFlatNamespace() throws Exception {
Assume.assumeTrue(!isNamespaceEnabled);
AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
TracingContext testTracingContext = getTestTracingContext(fs, false);
AbfsClient mockClient = getMockAbfsClient();
String base = "/" + getMethodName();
String path1 = base + "/dummyFile1";
String path2 = base + "/dummyFile2";
touch(new Path(path1));
setAbfsClient(abfsStore, mockClient);
// checking correct count in AbfsCounters
AbfsCounters counter = mockClient.getAbfsCounters();
IOStatistics ioStats = counter.getIOStatistics();
Long connMadeBeforeRename = lookupCounterStatistic(ioStats, CONNECTIONS_MADE.getStatName());
Long renamePathAttemptsBeforeRename = lookupCounterStatistic(ioStats, RENAME_PATH_ATTEMPTS.getStatName());
expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () ->
mockClient.renamePath(path1, path2, null, testTracingContext, null, false,
isNamespaceEnabled)));
// validating stat counters after rename
// only 2 calls should have happened in total for rename
// 1 -> original rename rest call, 2 -> first retry,
// no getPathStatus calls
// last getPathStatus call should be skipped
assertThatStatisticCounter(ioStats,
CONNECTIONS_MADE.getStatName())
.isEqualTo(2 + connMadeBeforeRename);
// the RENAME_PATH_ATTEMPTS stat should be incremented by 1
// retries happen internally within AbfsRestOperation execute()
// the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called
assertThatStatisticCounter(ioStats,
RENAME_PATH_ATTEMPTS.getStatName())
.isEqualTo(1 + renamePathAttemptsBeforeRename);
}
/**
* Test the resilient commit code works through fault injection, including
* reporting recovery.
*/
@Test
public void testResilientCommitOperation() throws Throwable {
AzureBlobFileSystem fs = getFileSystem();
TracingContext testTracingContext = getTestTracingContext(fs, false);
final AzureBlobFileSystemStore store = fs.getAbfsStore();
Assume.assumeTrue(store.getIsNamespaceEnabled(testTracingContext));
// patch in the mock abfs client to the filesystem, for the resilient
// commit API to pick up.
setAbfsClient(store, getMockAbfsClient());
String base = "/" + getMethodName();
String path1 = base + "/dummyDir1";
String path2 = base + "/dummyDir2";
final Path source = new Path(path1);
touch(source);
final String sourceTag = ((EtagSource) fs.getFileStatus(source)).getEtag();
final ResilientCommitByRename commit = fs.createResilientCommitSupport(source);
final Pair<Boolean, Duration> outcome =
commit.commitSingleFileByRename(source, new Path(path2), sourceTag);
Assertions.assertThat(outcome.getKey())
.describedAs("recovery flag")
.isTrue();
}
/**
* Test the resilient commit code works through fault injection, including
* reporting recovery.
*/
@Test
public void testResilientCommitOperationTagMismatch() throws Throwable {
AzureBlobFileSystem fs = getFileSystem();
TracingContext testTracingContext = getTestTracingContext(fs, false);
final AzureBlobFileSystemStore store = fs.getAbfsStore();
Assume.assumeTrue(store.getIsNamespaceEnabled(testTracingContext));
// patch in the mock abfs client to the filesystem, for the resilient
// commit API to pick up.
setAbfsClient(store, getMockAbfsClient());
String base = "/" + getMethodName();
String path1 = base + "/dummyDir1";
String path2 = base + "/dummyDir2";
final Path source = new Path(path1);
touch(source);
final String sourceTag = ((EtagSource) fs.getFileStatus(source)).getEtag();
final ResilientCommitByRename commit = fs.createResilientCommitSupport(source);
intercept(FileNotFoundException.class, () ->
commit.commitSingleFileByRename(source, new Path(path2), "not the right tag"));
}
/**
* Method to create an AbfsRestOperationException.
* @param statusCode status code to be used.