diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index f747bd068cc..e1ea75e4756 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -336,10 +336,19 @@ public class AbfsClient implements Closeable { url, requestHeaders); Instant renameRequestStartTime = Instant.now(); - op.execute(); - - if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) { - return renameIdempotencyCheckOp(renameRequestStartTime, op, destination); + try { + op.execute(); + } catch (AzureBlobFileSystemException e) { + final AbfsRestOperation idempotencyOp = renameIdempotencyCheckOp( + renameRequestStartTime, op, destination); + if (idempotencyOp.getResult().getStatusCode() + == op.getResult().getStatusCode()) { + // idempotency did not return different result + // throw back the exception + throw e; + } else { + return idempotencyOp; + } } return op; @@ -369,14 +378,21 @@ public class AbfsClient implements Closeable { // exists. Check on destination status and if it has a recent LMT timestamp. // If yes, return success, else fall back to original rename request failure response. - final AbfsRestOperation destStatusOp = getPathStatus(destination, false); - if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) { - String lmt = destStatusOp.getResult().getResponseHeader( - HttpHeaderConfigurations.LAST_MODIFIED); + try { + final AbfsRestOperation destStatusOp = getPathStatus(destination, + false); + if (destStatusOp.getResult().getStatusCode() + == HttpURLConnection.HTTP_OK) { + String lmt = destStatusOp.getResult().getResponseHeader( + HttpHeaderConfigurations.LAST_MODIFIED); - if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) { - return destStatusOp; + if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) { + return destStatusOp; + } } + } catch (AzureBlobFileSystemException e) { + // GetFileStatus on the destination failed, return original op + return op; } } @@ -570,10 +586,18 @@ public class AbfsClient implements Closeable { HTTP_METHOD_DELETE, url, requestHeaders); + try { op.execute(); - - if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) { - return deleteIdempotencyCheckOp(op); + } catch (AzureBlobFileSystemException e) { + final AbfsRestOperation idempotencyOp = deleteIdempotencyCheckOp(op); + if (idempotencyOp.getResult().getStatusCode() + == op.getResult().getStatusCode()) { + // idempotency did not return different result + // throw back the exception + throw e; + } else { + return idempotencyOp; + } } return op; @@ -822,7 +846,8 @@ public class AbfsClient implements Closeable { return createRequestUrl(EMPTY_STRING, query); } - private URL createRequestUrl(final String path, final String query) + @VisibleForTesting + protected URL createRequestUrl(final String path, final String query) throws AzureBlobFileSystemException { final String base = baseUrl.toString(); String encodedPath = path; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index f3986d4b1f3..936267aa507 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -24,6 +24,7 @@ import java.net.URL; import java.net.UnknownHostException; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -170,7 +171,8 @@ public class AbfsRestOperation { * Executes the REST operation with retry, by issuing one or more * HTTP operations. */ - void execute() throws AzureBlobFileSystemException { + @VisibleForTesting + public void execute() throws AzureBlobFileSystemException { // see if we have latency reports from the previous requests String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency(); if (latencyHeader != null && !latencyHeader.isEmpty()) { @@ -181,8 +183,9 @@ public class AbfsRestOperation { retryCount = 0; LOG.debug("First execution of REST operation - {}", operationType); - while (!executeHttpOperation(retryCount++)) { + while (!executeHttpOperation(retryCount)) { try { + ++retryCount; LOG.debug("Retrying REST operation {}. RetryCount = {}", operationType, retryCount); Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index e2973968e22..2f2a6191ed4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -30,6 +30,7 @@ import org.assertj.core.api.Assertions; import org.junit.Assume; import org.junit.Test; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; @@ -38,9 +39,12 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_OK; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -168,7 +172,8 @@ public class ITestAzureBlobFileSystemDelete extends // Set retryCount to non-zero when(op.isARetriedRequest()).thenReturn(true); - // Mock instance of Http Operation response. This will return HTTP:Not Found + // Case 1: Mock instance of Http Operation response. This will return + // HTTP:Not Found AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class); when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND); @@ -181,6 +186,64 @@ public class ITestAzureBlobFileSystemDelete extends .describedAs( "Delete is considered idempotent by default and should return success.") .isEqualTo(HTTP_OK); + + // Case 2: Mock instance of Http Operation response. This will return + // HTTP:Bad Request + AbfsHttpOperation http400Op = mock(AbfsHttpOperation.class); + when(http400Op.getStatusCode()).thenReturn(HTTP_BAD_REQUEST); + + // Mock delete response to 400 + when(op.getResult()).thenReturn(http400Op); + + Assertions.assertThat(testClient.deleteIdempotencyCheckOp(op) + .getResult() + .getStatusCode()) + .describedAs( + "Idempotency check to happen only for HTTP 404 response.") + .isEqualTo(HTTP_BAD_REQUEST); + + } + + @Test + public void testDeleteIdempotencyTriggerHttp404() throws Exception { + + final AzureBlobFileSystem fs = getFileSystem(); + AbfsClient client = TestAbfsClient.createTestClientFromCurrentContext( + fs.getAbfsStore().getClient(), + this.getConfiguration()); + + // Case 1: Not a retried case should throw error back + intercept(AbfsRestOperationException.class, + () -> client.deletePath( + "/NonExistingPath", + false, + null)); + + // mock idempotency check to mimic retried case + AbfsClient mockClient = TestAbfsClient.getMockAbfsClient( + fs.getAbfsStore().getClient(), + this.getConfiguration()); + + // Case 2: Mimic retried case + // Idempotency check on Delete always returns success + AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class); + AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class); + when(http200Op.getStatusCode()).thenReturn(HTTP_OK); + when(idempotencyRetOp.getResult()).thenReturn(http200Op); + + doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any()); + when(mockClient.deletePath("/NonExistingPath", false, + null)).thenCallRealMethod(); + + Assertions.assertThat(mockClient.deletePath( + "/NonExistingPath", + false, + null) + .getResult() + .getStatusCode()) + .describedAs("Idempotency check reports successful " + + "delete. 200OK should be returned") + .isEqualTo(idempotencyRetOp.getResult().getStatusCode()); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 7e03ee5bcc2..2adf70ca645 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -30,6 +30,7 @@ import org.assertj.core.api.Assertions; import org.junit.Test; import org.junit.Assert; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; @@ -42,6 +43,8 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_OK; import static java.util.UUID.randomUUID; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -51,6 +54,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotE import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + /** * Test rename operation. */ @@ -77,6 +82,16 @@ public class ITestAzureBlobFileSystemRename extends assertPathDoesNotExist(fs, "expected renamed", src); } + @Test + public void testRenameWithPreExistingDestination() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path src = path("renameSrc"); + touch(src); + Path dest = path("renameDest"); + touch(dest); + assertRenameOutcome(fs, src, dest, false); + } + @Test public void testRenameFileUnderDir() throws Exception { final AzureBlobFileSystem fs = getFileSystem(); @@ -197,6 +212,59 @@ public class ITestAzureBlobFileSystemRename extends + "TimespanForIdentifyingRecentOperationThroughLMT."); } + @Test + public void testRenameIdempotencyTriggerHttpNotFound() throws Exception { + AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class); + when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND); + + AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class); + when(http200Op.getStatusCode()).thenReturn(HTTP_OK); + + // Check 1 where idempotency check fails to find dest path + // Rename should throw exception + testRenameIdempotencyTriggerChecks(http404Op); + + // Check 2 where idempotency check finds the dest path + // Renam will be successful + testRenameIdempotencyTriggerChecks(http200Op); + } + + private void testRenameIdempotencyTriggerChecks( + AbfsHttpOperation idempotencyRetHttpOp) throws Exception { + + final AzureBlobFileSystem fs = getFileSystem(); + AbfsClient client = TestAbfsClient.getMockAbfsClient( + fs.getAbfsStore().getClient(), + this.getConfiguration()); + + AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class); + when(idempotencyRetOp.getResult()).thenReturn(idempotencyRetHttpOp); + doReturn(idempotencyRetOp).when(client).renameIdempotencyCheckOp(any(), + any(), any()); + when(client.renamePath(any(), any(), any())).thenCallRealMethod(); + + // rename on non-existing source file will trigger idempotency check + if (idempotencyRetHttpOp.getStatusCode() == HTTP_OK) { + // idempotency check found that destination exists and is recently created + Assertions.assertThat(client.renamePath( + "/NonExistingsourcepath", + "/destpath", + null) + .getResult() + .getStatusCode()) + .describedAs("Idempotency check reports recent successful " + + "rename. 200OK should be returned") + .isEqualTo(idempotencyRetOp.getResult().getStatusCode()); + } else { + // rename dest not found. Original exception should be returned. + intercept(AbfsRestOperationException.class, + () -> client.renamePath( + "/NonExistingsourcepath", + "/destpath", + "")); + } + } + private void testRenameTimeout( int renameRequestStatus, int renameIdempotencyCheckStatus, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index 8197e7e2020..bab02c09c74 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.lang.reflect.Field; import java.net.MalformedURLException; import java.net.URL; import java.util.regex.Pattern; @@ -33,6 +34,9 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION; @@ -271,4 +275,76 @@ public final class TestAbfsClient { return testClient; } + + public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, + AbfsConfiguration abfsConfig) + throws IOException, NoSuchFieldException, IllegalAccessException { + AuthType currentAuthType = abfsConfig.getAuthType( + abfsConfig.getAccountName()); + + org.junit.Assume.assumeTrue( + (currentAuthType == AuthType.SharedKey) + || (currentAuthType == AuthType.OAuth)); + + AbfsClient client = mock(AbfsClient.class); + AbfsPerfTracker tracker = new AbfsPerfTracker( + "test", + abfsConfig.getAccountName(), + abfsConfig); + + when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.getAuthType()).thenReturn(currentAuthType); + when(client.getRetryPolicy()).thenReturn( + new ExponentialRetryPolicy(1)); + + when(client.createDefaultUriQueryBuilder()).thenCallRealMethod(); + when(client.createRequestUrl(any(), any())).thenCallRealMethod(); + when(client.getAccessToken()).thenCallRealMethod(); + when(client.getSharedKeyCredentials()).thenCallRealMethod(); + when(client.createDefaultHeaders()).thenCallRealMethod(); + + // override baseurl + Field baseUrlField = AbfsClient.class.getDeclaredField("baseUrl"); + baseUrlField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(baseUrlField, baseUrlField.getModifiers() & ~java.lang.reflect.Modifier.FINAL); + baseUrlField.set(client, baseAbfsClientInstance.getBaseUrl()); + + // override auth provider + if (currentAuthType == AuthType.SharedKey) { + Field sharedKeyCredsField = AbfsClient.class.getDeclaredField( + "sharedKeyCredentials"); + sharedKeyCredsField.setAccessible(true); + modifiersField.setInt(sharedKeyCredsField, + sharedKeyCredsField.getModifiers() + & ~java.lang.reflect.Modifier.FINAL); + sharedKeyCredsField.set(client, new SharedKeyCredentials( + abfsConfig.getAccountName().substring(0, + abfsConfig.getAccountName().indexOf(DOT)), + abfsConfig.getStorageAccountKey())); + } else { + Field tokenProviderField = AbfsClient.class.getDeclaredField( + "tokenProvider"); + tokenProviderField.setAccessible(true); + modifiersField.setInt(tokenProviderField, + tokenProviderField.getModifiers() + & ~java.lang.reflect.Modifier.FINAL); + tokenProviderField.set(client, abfsConfig.getTokenProvider()); + } + + // override user agent + String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild " + + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; " + + "UNKNOWN/UNKNOWN) MSFT"; + Field userAgentField = AbfsClient.class.getDeclaredField( + "userAgent"); + userAgentField.setAccessible(true); + modifiersField.setInt(userAgentField, + userAgentField.getModifiers() + & ~java.lang.reflect.Modifier.FINAL); + userAgentField.set(client, userAgent); + + return client; + } }