HADOOP-18242. ABFS Rename Failure when tracking metadata is in an incomplete state (#4331)
ABFS rename fails intermittently when the Storage-blob tracking metadata is in an incomplete state. This surfaces as the error code 404 and an error message of "RenameDestinationParentPathNotFound" To mitigate this issue, when a request fails with this response. the ABFS client issues a HEAD call on the source file and then retries the rename operation again ABFS filesystem statistics track when this occurs with new counters rename_recovery metadata_incomplete_rename_failures rename_path_attempts This is very rare occurrence and appears to be triggered under certain heavy load conditions, just as with HADOOP-18163. Contributed by Mehakmeet Singh.
This commit is contained in:
parent
25f8bdcd21
commit
823f5ee0d4
|
@ -87,7 +87,11 @@ public class AbfsCountersImpl implements AbfsCounters {
|
|||
BYTES_RECEIVED,
|
||||
READ_THROTTLES,
|
||||
WRITE_THROTTLES,
|
||||
SERVER_UNAVAILABLE
|
||||
SERVER_UNAVAILABLE,
|
||||
RENAME_RECOVERY,
|
||||
METADATA_INCOMPLETE_RENAME_FAILURES,
|
||||
RENAME_PATH_ATTEMPTS
|
||||
|
||||
};
|
||||
|
||||
private static final AbfsStatistic[] DURATION_TRACKER_LIST = {
|
||||
|
|
|
@ -100,7 +100,16 @@ public enum AbfsStatistic {
|
|||
AbfsHttpConstants.HTTP_METHOD_PATCH),
|
||||
HTTP_POST_REQUEST(StoreStatisticNames.ACTION_HTTP_POST_REQUEST,
|
||||
"Time taken to complete a POST request",
|
||||
AbfsHttpConstants.HTTP_METHOD_POST);
|
||||
AbfsHttpConstants.HTTP_METHOD_POST),
|
||||
|
||||
// Rename recovery
|
||||
RENAME_RECOVERY("rename_recovery",
|
||||
"Number of times Rename recoveries happened"),
|
||||
METADATA_INCOMPLETE_RENAME_FAILURES("metadata_incomplete_rename_failures",
|
||||
"Number of times rename operation failed due to metadata being "
|
||||
+ "incomplete"),
|
||||
RENAME_PATH_ATTEMPTS("rename_path_attempts",
|
||||
"Number of times we attempt to rename a path internally");
|
||||
|
||||
private String statName;
|
||||
private String statDescription;
|
||||
|
|
|
@ -1576,7 +1576,7 @@ public class AzureBlobFileSystem extends FileSystem
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
AzureBlobFileSystemStore getAbfsStore() {
|
||||
public AzureBlobFileSystemStore getAbfsStore() {
|
||||
return abfsStore;
|
||||
}
|
||||
|
||||
|
|
|
@ -62,7 +62,6 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Listenable
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -97,6 +96,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
|
|||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContext;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContextBuilder;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClientRenameResult;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
||||
|
@ -132,6 +132,8 @@ import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
|
|||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES;
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_RECOVERY;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYPHEN;
|
||||
|
@ -919,18 +921,19 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
|
|||
|
||||
do {
|
||||
try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) {
|
||||
final Pair<AbfsRestOperation, Boolean> pair =
|
||||
final AbfsClientRenameResult abfsClientRenameResult =
|
||||
client.renamePath(sourceRelativePath, destinationRelativePath,
|
||||
continuation, tracingContext, sourceEtag);
|
||||
continuation, tracingContext, sourceEtag, false);
|
||||
|
||||
AbfsRestOperation op = pair.getLeft();
|
||||
AbfsRestOperation op = abfsClientRenameResult.getOp();
|
||||
perfInfo.registerResult(op.getResult());
|
||||
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
|
||||
perfInfo.registerSuccess(true);
|
||||
countAggregate++;
|
||||
shouldContinue = continuation != null && !continuation.isEmpty();
|
||||
// update the recovery flag.
|
||||
recovered |= pair.getRight();
|
||||
recovered |= abfsClientRenameResult.isRenameRecovered();
|
||||
populateRenameRecoveryStatistics(abfsClientRenameResult);
|
||||
if (!shouldContinue) {
|
||||
perfInfo.registerAggregates(startAggregate, countAggregate);
|
||||
}
|
||||
|
@ -1905,7 +1908,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
AbfsClient getClient() {
|
||||
public AbfsClient getClient() {
|
||||
return this.client;
|
||||
}
|
||||
|
||||
|
@ -1973,4 +1976,19 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
|
|||
}
|
||||
return etag;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment rename recovery based counters in IOStatistics.
|
||||
*
|
||||
* @param abfsClientRenameResult Result of an ABFS rename operation.
|
||||
*/
|
||||
private void populateRenameRecoveryStatistics(
|
||||
AbfsClientRenameResult abfsClientRenameResult) {
|
||||
if (abfsClientRenameResult.isRenameRecovered()) {
|
||||
abfsCounters.incrementCounter(RENAME_RECOVERY, 1);
|
||||
}
|
||||
if (abfsClientRenameResult.isIncompleteMetadataState()) {
|
||||
abfsCounters.incrementCounter(METADATA_INCOMPLETE_RENAME_FAILURES, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.concurrent.ThreadFactory;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.fs.store.LogExactlyOnce;
|
||||
import org.apache.hadoop.util.Preconditions;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
|
||||
|
@ -51,7 +52,6 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFact
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
|
||||
|
@ -69,6 +69,7 @@ import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
|
|||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
|
||||
import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
|
||||
|
@ -76,6 +77,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.S
|
|||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
|
||||
|
||||
/**
|
||||
* AbfsClient.
|
||||
|
@ -102,6 +104,10 @@ public class AbfsClient implements Closeable {
|
|||
|
||||
private final ListeningScheduledExecutorService executorService;
|
||||
|
||||
/** logging the rename failure if metadata is in an incomplete state. */
|
||||
private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE =
|
||||
new LogExactlyOnce(LOG);
|
||||
|
||||
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||
final AbfsConfiguration abfsConfiguration,
|
||||
final AbfsClientContext abfsClientContext)
|
||||
|
@ -496,15 +502,19 @@ public class AbfsClient implements Closeable {
|
|||
* @param continuation continuation.
|
||||
* @param tracingContext trace context
|
||||
* @param sourceEtag etag of source file. may be null or empty
|
||||
* @return pair of (the rename operation, flag indicating recovery took place)
|
||||
* @param isMetadataIncompleteState was there a rename failure due to
|
||||
* incomplete metadata state?
|
||||
* @return AbfsClientRenameResult result of rename operation indicating the
|
||||
* AbfsRest operation, rename recovery and incomplete metadata state failure.
|
||||
* @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures.
|
||||
*/
|
||||
public Pair<AbfsRestOperation, Boolean> renamePath(
|
||||
public AbfsClientRenameResult renamePath(
|
||||
final String source,
|
||||
final String destination,
|
||||
final String continuation,
|
||||
final TracingContext tracingContext,
|
||||
final String sourceEtag)
|
||||
final String sourceEtag,
|
||||
boolean isMetadataIncompleteState)
|
||||
throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
|
||||
|
@ -531,13 +541,45 @@ public class AbfsClient implements Closeable {
|
|||
url,
|
||||
requestHeaders);
|
||||
try {
|
||||
incrementAbfsRenamePath();
|
||||
op.execute(tracingContext);
|
||||
return Pair.of(op, false);
|
||||
// AbfsClientResult contains the AbfsOperation, If recovery happened or
|
||||
// not, and the incompleteMetaDataState is true or false.
|
||||
// If we successfully rename a path and isMetadataIncompleteState was
|
||||
// true, then rename was recovered, else it didn't, this is why
|
||||
// isMetadataIncompleteState is used for renameRecovery(as the 2nd param).
|
||||
return new AbfsClientRenameResult(op, isMetadataIncompleteState, isMetadataIncompleteState);
|
||||
} catch (AzureBlobFileSystemException e) {
|
||||
// If we have no HTTP response, throw the original exception.
|
||||
if (!op.hasResult()) {
|
||||
throw e;
|
||||
}
|
||||
|
||||
// ref: HADOOP-18242. Rename failure occurring due to a rare case of
|
||||
// tracking metadata being in incomplete state.
|
||||
if (op.getResult().getStorageErrorCode()
|
||||
.equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())
|
||||
&& !isMetadataIncompleteState) {
|
||||
//Logging
|
||||
ABFS_METADATA_INCOMPLETE_RENAME_FAILURE
|
||||
.info("Rename Failure attempting to resolve tracking metadata state and retrying.");
|
||||
|
||||
// Doing a HEAD call resolves the incomplete metadata state and
|
||||
// then we can retry the rename operation.
|
||||
AbfsRestOperation sourceStatusOp = getPathStatus(source, false,
|
||||
tracingContext);
|
||||
isMetadataIncompleteState = true;
|
||||
// Extract the sourceEtag, using the status Op, and set it
|
||||
// for future rename recovery.
|
||||
AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult();
|
||||
String sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult);
|
||||
renamePath(source, destination, continuation, tracingContext,
|
||||
sourceEtagAfterFailure, isMetadataIncompleteState);
|
||||
}
|
||||
// if we get out of the condition without a successful rename, then
|
||||
// it isn't metadata incomplete state issue.
|
||||
isMetadataIncompleteState = false;
|
||||
|
||||
boolean etagCheckSucceeded = renameIdempotencyCheckOp(
|
||||
source,
|
||||
sourceEtag, op, destination, tracingContext);
|
||||
|
@ -546,10 +588,14 @@ public class AbfsClient implements Closeable {
|
|||
// throw back the exception
|
||||
throw e;
|
||||
}
|
||||
return Pair.of(op, true);
|
||||
return new AbfsClientRenameResult(op, true, isMetadataIncompleteState);
|
||||
}
|
||||
}
|
||||
|
||||
private void incrementAbfsRenamePath() {
|
||||
abfsCounters.incrementCounter(RENAME_PATH_ATTEMPTS, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the rename request failure is post a retry and if earlier rename
|
||||
* request might have succeeded at back-end.
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
/**
|
||||
* A class to store the Result of an AbfsClient rename operation, signifying the
|
||||
* AbfsRestOperation result and the rename recovery.
|
||||
*/
|
||||
public class AbfsClientRenameResult {
|
||||
|
||||
/** Abfs Rest Operation. */
|
||||
private final AbfsRestOperation op;
|
||||
/** Flag indicating recovery took place. */
|
||||
private final boolean renameRecovered;
|
||||
/** Abfs storage tracking metadata is in an incomplete state. */
|
||||
private final boolean isIncompleteMetadataState;
|
||||
|
||||
/**
|
||||
* Constructing an ABFS rename operation result.
|
||||
* @param op The AbfsRestOperation.
|
||||
* @param renameRecovered Did rename recovery took place?
|
||||
* @param isIncompleteMetadataState Did the rename failed due to incomplete
|
||||
* metadata state and had to be retried?
|
||||
*/
|
||||
public AbfsClientRenameResult(
|
||||
AbfsRestOperation op,
|
||||
boolean renameRecovered,
|
||||
boolean isIncompleteMetadataState) {
|
||||
this.op = op;
|
||||
this.renameRecovered = renameRecovered;
|
||||
this.isIncompleteMetadataState = isIncompleteMetadataState;
|
||||
}
|
||||
|
||||
public AbfsRestOperation getOp() {
|
||||
return op;
|
||||
}
|
||||
|
||||
public boolean isRenameRecovered() {
|
||||
return renameRecovered;
|
||||
}
|
||||
|
||||
public boolean isIncompleteMetadataState() {
|
||||
return isIncompleteMetadataState;
|
||||
}
|
||||
}
|
|
@ -401,8 +401,8 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
|
|||
fs.create(new Path(src)).close();
|
||||
AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
|
||||
.renamePath(src, "/testABC" + "/abc.txt", null,
|
||||
getTestTracingContext(fs, false), null)
|
||||
.getLeft();
|
||||
getTestTracingContext(fs, false), null, false)
|
||||
.getOp();
|
||||
AbfsHttpOperation result = abfsHttpRestOperation.getResult();
|
||||
String url = result.getMaskedUrl();
|
||||
String encodedUrl = result.getMaskedEncodedUrl();
|
||||
|
@ -419,7 +419,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
|
|||
intercept(IOException.class, "sig=XXXX",
|
||||
() -> getFileSystem().getAbfsClient()
|
||||
.renamePath("testABC/test.xt", "testABC/abc.txt", null,
|
||||
getTestTracingContext(getFileSystem(), false), null));
|
||||
getTestTracingContext(getFileSystem(), false), null, false));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -30,12 +30,17 @@ import org.junit.Test;
|
|||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
||||
|
||||
/**
|
||||
* Test rename operation.
|
||||
|
@ -167,4 +172,30 @@ public class ITestAzureBlobFileSystemRename extends
|
|||
new Path(testDir2 + "/test1/test2/test3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRenameWithNoDestinationParentDir() throws Exception {
|
||||
describe("Verifying the expected behaviour of ABFS rename when "
|
||||
+ "destination parent Dir doesn't exist.");
|
||||
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path sourcePath = path(getMethodName());
|
||||
Path destPath = new Path("falseParent", "someChildFile");
|
||||
|
||||
byte[] data = dataset(1024, 'a', 'z');
|
||||
writeDataset(fs, sourcePath, data, data.length, 1024, true);
|
||||
|
||||
// Verify that renaming on a destination with no parent dir wasn't
|
||||
// successful.
|
||||
assertFalse("Rename result expected to be false with no Parent dir",
|
||||
fs.rename(sourcePath, destPath));
|
||||
|
||||
// Verify that metadata was in an incomplete state after the rename
|
||||
// failure, and we retired the rename once more.
|
||||
IOStatistics ioStatistics = fs.getIOStatistics();
|
||||
IOStatisticAssertions.assertThatStatisticCounter(ioStatistics,
|
||||
RENAME_PATH_ATTEMPTS.getStatName())
|
||||
.describedAs("There should be 2 rename attempts if metadata "
|
||||
+ "incomplete state failure is hit")
|
||||
.isEqualTo(2);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -526,8 +526,8 @@ public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
|
|||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
AbfsRestOperation abfsRestOperation = abfsClient
|
||||
.renamePath(testFileName, newName, null,
|
||||
getTestTracingContext(fs, false), null)
|
||||
.getLeft();
|
||||
getTestTracingContext(fs, false), null, false)
|
||||
.getOp();
|
||||
assertCPKHeaders(abfsRestOperation, false);
|
||||
assertNoCPKResponseHeadersPresent(abfsRestOperation);
|
||||
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Testing Abfs Rename recovery using Mockito.
|
||||
*/
|
||||
public class TestAbfsRenameRetryRecovery extends AbstractAbfsIntegrationTest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestAbfsRenameRetryRecovery.class);
|
||||
|
||||
public TestAbfsRenameRetryRecovery() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Mock the AbfsClient to run a metadata incomplete scenario with recovery
|
||||
* rename.
|
||||
*/
|
||||
@Test
|
||||
public void testRenameFailuresDueToIncompleteMetadata() throws Exception {
|
||||
String sourcePath = getMethodName() + "Source";
|
||||
String destNoParentPath = "/NoParent/Dest";
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
|
||||
fs.getAbfsStore().getClient(),
|
||||
fs.getAbfsStore().getAbfsConfiguration());
|
||||
|
||||
AbfsCounters abfsCounters = mock(AbfsCounters.class);
|
||||
when(mockClient.getAbfsCounters()).thenReturn(abfsCounters);
|
||||
// SuccessFul Result.
|
||||
AbfsRestOperation successOp =
|
||||
new AbfsRestOperation(AbfsRestOperationType.RenamePath, mockClient,
|
||||
HTTP_METHOD_PUT, null, null);
|
||||
AbfsClientRenameResult successResult = mock(AbfsClientRenameResult.class);
|
||||
doReturn(successOp).when(successResult).getOp();
|
||||
when(successResult.isIncompleteMetadataState()).thenReturn(false);
|
||||
|
||||
// Failed Result.
|
||||
AbfsRestOperation failedOp = new AbfsRestOperation(AbfsRestOperationType.RenamePath, mockClient,
|
||||
HTTP_METHOD_PUT, null, null);
|
||||
AbfsClientRenameResult recoveredMetaDataIncompleteResult =
|
||||
mock(AbfsClientRenameResult.class);
|
||||
|
||||
doReturn(failedOp).when(recoveredMetaDataIncompleteResult).getOp();
|
||||
when(recoveredMetaDataIncompleteResult.isIncompleteMetadataState()).thenReturn(true);
|
||||
|
||||
// No destination Parent dir exception.
|
||||
AzureBlobFileSystemException destParentNotFound
|
||||
= getMockAbfsRestOperationException(
|
||||
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getStatusCode(),
|
||||
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode());
|
||||
|
||||
// We need to throw an exception once a rename is triggered with
|
||||
// destination having no parent, but after a retry it needs to succeed.
|
||||
when(mockClient.renamePath(sourcePath, destNoParentPath, null, null,
|
||||
null, false))
|
||||
.thenThrow(destParentNotFound)
|
||||
.thenReturn(recoveredMetaDataIncompleteResult);
|
||||
|
||||
// Dest parent not found exc. to be raised.
|
||||
intercept(AzureBlobFileSystemException.class,
|
||||
() -> mockClient.renamePath(sourcePath,
|
||||
destNoParentPath, null, null,
|
||||
null, false));
|
||||
|
||||
AbfsClientRenameResult resultOfSecondRenameCall =
|
||||
mockClient.renamePath(sourcePath,
|
||||
destNoParentPath, null, null,
|
||||
null, false);
|
||||
|
||||
// the second rename call should be the recoveredResult due to
|
||||
// metaDataIncomplete
|
||||
Assertions.assertThat(resultOfSecondRenameCall)
|
||||
.describedAs("This result should be recovered result due to MetaData "
|
||||
+ "being in incomplete state")
|
||||
.isSameAs(recoveredMetaDataIncompleteResult);
|
||||
// Verify Incomplete metadata state happened for our second rename call.
|
||||
assertTrue("Metadata incomplete state should be true if a rename is "
|
||||
+ "retried after no Parent directory is found",
|
||||
resultOfSecondRenameCall.isIncompleteMetadataState());
|
||||
|
||||
|
||||
// Verify renamePath occurred two times implying a retry was attempted.
|
||||
verify(mockClient, times(2))
|
||||
.renamePath(sourcePath, destNoParentPath, null, null, null, false);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to create an AbfsRestOperationException.
|
||||
* @param statusCode status code to be used.
|
||||
* @param errorCode error code to be used.
|
||||
* @return the exception.
|
||||
*/
|
||||
private AbfsRestOperationException getMockAbfsRestOperationException(
|
||||
int statusCode, String errorCode) {
|
||||
return new AbfsRestOperationException(statusCode, errorCode,
|
||||
"No Parent found for the Destination file",
|
||||
new Exception());
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue