HADOOP-18002. ABFS rename idempotency broken -remove recovery (#3641)
Cut modtime-based rename recovery as object modification time is not updated during rename operation. Applications will have to use etag API of HADOOP-17979 and implement it themselves. Why not do the HEAD and etag recovery in ABFS client? Cuts the IO capacity in half so kills job commit performance. The manifest committer of MAPREDUCE-7341 will do this recovery and act as the reference implementation of the algorithm. Contributed by: Steve Loughran
This commit is contained in:
parent
5626734a36
commit
3391b69692
|
@ -28,7 +28,6 @@ import java.net.URLEncoder;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -63,7 +62,6 @@ import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
|
||||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
||||||
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
||||||
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
|
|
||||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
|
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
|
||||||
|
@ -507,75 +505,8 @@ public class AbfsClient implements Closeable {
|
||||||
HTTP_METHOD_PUT,
|
HTTP_METHOD_PUT,
|
||||||
url,
|
url,
|
||||||
requestHeaders);
|
requestHeaders);
|
||||||
Instant renameRequestStartTime = Instant.now();
|
// no attempt at recovery using timestamps as it was not reliable.
|
||||||
try {
|
|
||||||
op.execute(tracingContext);
|
op.execute(tracingContext);
|
||||||
} catch (AzureBlobFileSystemException e) {
|
|
||||||
// If we have no HTTP response, throw the original exception.
|
|
||||||
if (!op.hasResult()) {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
final AbfsRestOperation idempotencyOp = renameIdempotencyCheckOp(
|
|
||||||
renameRequestStartTime, op, destination, tracingContext);
|
|
||||||
if (idempotencyOp.getResult().getStatusCode()
|
|
||||||
== op.getResult().getStatusCode()) {
|
|
||||||
// idempotency did not return different result
|
|
||||||
// throw back the exception
|
|
||||||
throw e;
|
|
||||||
} else {
|
|
||||||
return idempotencyOp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return op;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if the rename request failure is post a retry and if earlier rename
|
|
||||||
* request might have succeeded at back-end.
|
|
||||||
*
|
|
||||||
* If there is a parallel rename activity happening from any other store
|
|
||||||
* interface, the logic here will detect the rename to have happened due to
|
|
||||||
* the one initiated from this ABFS filesytem instance as it was retried. This
|
|
||||||
* should be a corner case hence going ahead with LMT check.
|
|
||||||
* @param renameRequestStartTime startTime for the rename request
|
|
||||||
* @param op Rename request REST operation response with non-null HTTP response
|
|
||||||
* @param destination rename destination path
|
|
||||||
* @param tracingContext Tracks identifiers for request header
|
|
||||||
* @return REST operation response post idempotency check
|
|
||||||
* @throws AzureBlobFileSystemException if GetFileStatus hits any exception
|
|
||||||
*/
|
|
||||||
public AbfsRestOperation renameIdempotencyCheckOp(
|
|
||||||
final Instant renameRequestStartTime,
|
|
||||||
final AbfsRestOperation op,
|
|
||||||
final String destination,
|
|
||||||
TracingContext tracingContext) throws AzureBlobFileSystemException {
|
|
||||||
Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response");
|
|
||||||
if ((op.isARetriedRequest())
|
|
||||||
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)) {
|
|
||||||
// Server has returned HTTP 404, which means rename source no longer
|
|
||||||
// exists. Check on destination status and if it has a recent LMT timestamp.
|
|
||||||
// If yes, return success, else fall back to original rename request failure response.
|
|
||||||
|
|
||||||
try {
|
|
||||||
final AbfsRestOperation destStatusOp = getPathStatus(destination,
|
|
||||||
false, tracingContext);
|
|
||||||
if (destStatusOp.getResult().getStatusCode()
|
|
||||||
== HttpURLConnection.HTTP_OK) {
|
|
||||||
String lmt = destStatusOp.getResult().getResponseHeader(
|
|
||||||
HttpHeaderConfigurations.LAST_MODIFIED);
|
|
||||||
|
|
||||||
if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
|
|
||||||
LOG.debug("Returning success response from rename idempotency logic");
|
|
||||||
return destStatusOp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (AzureBlobFileSystemException e) {
|
|
||||||
// GetFileStatus on the destination failed, return original op
|
|
||||||
return op;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return op;
|
return op;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -192,6 +192,7 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
|
|
||||||
// Mock delete response to 404
|
// Mock delete response to 404
|
||||||
when(op.getResult()).thenReturn(http404Op);
|
when(op.getResult()).thenReturn(http404Op);
|
||||||
|
when(op.hasResult()).thenReturn(true);
|
||||||
|
|
||||||
Assertions.assertThat(testClient.deleteIdempotencyCheckOp(op)
|
Assertions.assertThat(testClient.deleteIdempotencyCheckOp(op)
|
||||||
.getResult()
|
.getResult()
|
||||||
|
@ -207,6 +208,7 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
|
|
||||||
// Mock delete response to 400
|
// Mock delete response to 400
|
||||||
when(op.getResult()).thenReturn(http400Op);
|
when(op.getResult()).thenReturn(http400Op);
|
||||||
|
when(op.hasResult()).thenReturn(true);
|
||||||
|
|
||||||
Assertions.assertThat(testClient.deleteIdempotencyCheckOp(op)
|
Assertions.assertThat(testClient.deleteIdempotencyCheckOp(op)
|
||||||
.getResult()
|
.getResult()
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.azurebfs;
|
package org.apache.hadoop.fs.azurebfs;
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
@ -26,36 +25,17 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import org.assertj.core.api.Assertions;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
|
||||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
|
||||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
|
||||||
import static java.net.HttpURLConnection.HTTP_OK;
|
|
||||||
import static java.util.UUID.randomUUID;
|
|
||||||
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
|
||||||
import static org.mockito.Mockito.doReturn;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS;
|
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test rename operation.
|
* Test rename operation.
|
||||||
|
@ -63,9 +43,6 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
public class ITestAzureBlobFileSystemRename extends
|
public class ITestAzureBlobFileSystemRename extends
|
||||||
AbstractAbfsIntegrationTest {
|
AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
private static final int REDUCED_RETRY_COUNT = 1;
|
|
||||||
private static final int REDUCED_MAX_BACKOFF_INTERVALS_MS = 5000;
|
|
||||||
|
|
||||||
public ITestAzureBlobFileSystemRename() throws Exception {
|
public ITestAzureBlobFileSystemRename() throws Exception {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
@ -190,152 +167,4 @@ public class ITestAzureBlobFileSystemRename extends
|
||||||
new Path(testDir2 + "/test1/test2/test3"));
|
new Path(testDir2 + "/test1/test2/test3"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRenameRetryFailureAsHTTP400() throws Exception {
|
|
||||||
// Rename failed as Bad Request
|
|
||||||
// RenameIdempotencyCheck should throw back the rename failure Op
|
|
||||||
testRenameTimeout(HTTP_BAD_REQUEST, HTTP_BAD_REQUEST, false,
|
|
||||||
"renameIdempotencyCheckOp should return rename BadRequest "
|
|
||||||
+ "response itself.");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRenameRetryFailureAsHTTP404() throws Exception {
|
|
||||||
// Rename failed as FileNotFound and the destination LMT is
|
|
||||||
// within TimespanForIdentifyingRecentOperationThroughLMT
|
|
||||||
testRenameTimeout(HTTP_NOT_FOUND, HTTP_OK, false,
|
|
||||||
"Rename should return success response because the destination "
|
|
||||||
+ "path is present and its LMT is within "
|
|
||||||
+ "TimespanForIdentifyingRecentOperationThroughLMT.");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRenameRetryFailureWithDestOldLMT() throws Exception {
|
|
||||||
// Rename failed as FileNotFound and the destination LMT is
|
|
||||||
// older than TimespanForIdentifyingRecentOperationThroughLMT
|
|
||||||
testRenameTimeout(HTTP_NOT_FOUND, HTTP_NOT_FOUND, true,
|
|
||||||
"Rename should return original rename failure response "
|
|
||||||
+ "because the destination path LMT is older than "
|
|
||||||
+ "TimespanForIdentifyingRecentOperationThroughLMT.");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRenameIdempotencyTriggerHttpNotFound() throws Exception {
|
|
||||||
AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
|
|
||||||
when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);
|
|
||||||
|
|
||||||
AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
|
|
||||||
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
|
|
||||||
|
|
||||||
// Check 1 where idempotency check fails to find dest path
|
|
||||||
// Rename should throw exception
|
|
||||||
testRenameIdempotencyTriggerChecks(http404Op);
|
|
||||||
|
|
||||||
// Check 2 where idempotency check finds the dest path
|
|
||||||
// Renam will be successful
|
|
||||||
testRenameIdempotencyTriggerChecks(http200Op);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testRenameIdempotencyTriggerChecks(
|
|
||||||
AbfsHttpOperation idempotencyRetHttpOp) throws Exception {
|
|
||||||
|
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
|
||||||
AbfsClient client = TestAbfsClient.getMockAbfsClient(
|
|
||||||
fs.getAbfsStore().getClient(),
|
|
||||||
this.getConfiguration());
|
|
||||||
|
|
||||||
AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
|
|
||||||
when(idempotencyRetOp.getResult()).thenReturn(idempotencyRetHttpOp);
|
|
||||||
doReturn(idempotencyRetOp).when(client).renameIdempotencyCheckOp(any(),
|
|
||||||
any(), any(), any());
|
|
||||||
when(client.renamePath(any(), any(), any(), any())).thenCallRealMethod();
|
|
||||||
|
|
||||||
// rename on non-existing source file will trigger idempotency check
|
|
||||||
if (idempotencyRetHttpOp.getStatusCode() == HTTP_OK) {
|
|
||||||
// idempotency check found that destination exists and is recently created
|
|
||||||
Assertions.assertThat(client.renamePath(
|
|
||||||
"/NonExistingsourcepath",
|
|
||||||
"/destpath",
|
|
||||||
null,
|
|
||||||
getTestTracingContext(fs, true))
|
|
||||||
.getResult()
|
|
||||||
.getStatusCode())
|
|
||||||
.describedAs("Idempotency check reports recent successful "
|
|
||||||
+ "rename. 200OK should be returned")
|
|
||||||
.isEqualTo(idempotencyRetOp.getResult().getStatusCode());
|
|
||||||
} else {
|
|
||||||
// rename dest not found. Original exception should be returned.
|
|
||||||
intercept(AbfsRestOperationException.class,
|
|
||||||
() -> client.renamePath(
|
|
||||||
"/NonExistingsourcepath",
|
|
||||||
"/destpath",
|
|
||||||
"",
|
|
||||||
getTestTracingContext(fs, true)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testRenameTimeout(
|
|
||||||
int renameRequestStatus,
|
|
||||||
int renameIdempotencyCheckStatus,
|
|
||||||
boolean isOldOp,
|
|
||||||
String assertMessage) throws Exception {
|
|
||||||
// Config to reduce the retry and maxBackoff time for test run
|
|
||||||
AbfsConfiguration abfsConfig
|
|
||||||
= TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
|
|
||||||
getConfiguration(),
|
|
||||||
REDUCED_RETRY_COUNT, REDUCED_MAX_BACKOFF_INTERVALS_MS);
|
|
||||||
|
|
||||||
final AzureBlobFileSystem fs = getFileSystem();
|
|
||||||
AbfsClient abfsClient = fs.getAbfsStore().getClient();
|
|
||||||
AbfsClient testClient = TestAbfsClient.createTestClientFromCurrentContext(
|
|
||||||
abfsClient,
|
|
||||||
abfsConfig);
|
|
||||||
|
|
||||||
// Mock instance of AbfsRestOperation
|
|
||||||
AbfsRestOperation op = mock(AbfsRestOperation.class);
|
|
||||||
// Set retryCount to non-zero
|
|
||||||
when(op.isARetriedRequest()).thenReturn(true);
|
|
||||||
|
|
||||||
// Mock instance of Http Operation response. This will return HTTP:Bad Request
|
|
||||||
AbfsHttpOperation http400Op = mock(AbfsHttpOperation.class);
|
|
||||||
when(http400Op.getStatusCode()).thenReturn(HTTP_BAD_REQUEST);
|
|
||||||
|
|
||||||
// Mock instance of Http Operation response. This will return HTTP:Not Found
|
|
||||||
AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
|
|
||||||
when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);
|
|
||||||
|
|
||||||
Path destinationPath = fs.makeQualified(
|
|
||||||
new Path("destination" + randomUUID().toString()));
|
|
||||||
|
|
||||||
Instant renameRequestStartTime = Instant.now();
|
|
||||||
|
|
||||||
if (renameRequestStatus == HTTP_BAD_REQUEST) {
|
|
||||||
when(op.getResult()).thenReturn(http400Op);
|
|
||||||
} else if (renameRequestStatus == HTTP_NOT_FOUND) {
|
|
||||||
// Create the file new.
|
|
||||||
fs.create(destinationPath).close();
|
|
||||||
when(op.getResult()).thenReturn(http404Op);
|
|
||||||
|
|
||||||
if (isOldOp) {
|
|
||||||
// instead of sleeping for DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS
|
|
||||||
// which will affect test run time
|
|
||||||
// will modify renameRequestStartTime to a future time so that
|
|
||||||
// lmt will qualify for old op
|
|
||||||
renameRequestStartTime = renameRequestStartTime.plusSeconds(
|
|
||||||
DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
Assertions.assertThat(testClient.renameIdempotencyCheckOp(
|
|
||||||
renameRequestStartTime,
|
|
||||||
op,
|
|
||||||
destinationPath.toUri().getPath(),
|
|
||||||
getTestTracingContext(fs, true))
|
|
||||||
.getResult()
|
|
||||||
.getStatusCode())
|
|
||||||
.describedAs(assertMessage)
|
|
||||||
.isEqualTo(renameIdempotencyCheckStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue