HADOOP-17407. ABFS: Fix NPE on delete idempotency flow
- Contributed by Sneha Vijayarajan
(cherry picked from commit 5ca1ea89b3
)
This commit is contained in:
parent
5f312a0d85
commit
f3a0ca66c2
|
@ -383,6 +383,7 @@ public class AbfsClient implements Closeable {
|
||||||
HttpHeaderConfigurations.LAST_MODIFIED);
|
HttpHeaderConfigurations.LAST_MODIFIED);
|
||||||
|
|
||||||
if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
|
if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
|
||||||
|
LOG.debug("Returning success response from rename idempotency logic");
|
||||||
return destStatusOp;
|
return destStatusOp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -450,6 +451,7 @@ public class AbfsClient implements Closeable {
|
||||||
String fileLength = destStatusOp.getResult().getResponseHeader(
|
String fileLength = destStatusOp.getResult().getResponseHeader(
|
||||||
HttpHeaderConfigurations.CONTENT_LENGTH);
|
HttpHeaderConfigurations.CONTENT_LENGTH);
|
||||||
if (length <= Long.parseLong(fileLength)) {
|
if (length <= Long.parseLong(fileLength)) {
|
||||||
|
LOG.debug("Returning success response from append blob idempotency code");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -627,6 +629,7 @@ public class AbfsClient implements Closeable {
|
||||||
op.getUrl(),
|
op.getUrl(),
|
||||||
op.getRequestHeaders());
|
op.getRequestHeaders());
|
||||||
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
|
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
|
||||||
|
LOG.debug("Returning success response from delete idempotency logic");
|
||||||
return successOp;
|
return successOp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,12 +86,23 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
||||||
private long sendRequestTimeMs;
|
private long sendRequestTimeMs;
|
||||||
private long recvResponseTimeMs;
|
private long recvResponseTimeMs;
|
||||||
|
|
||||||
public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(final URL url,
|
public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(
|
||||||
final String method, final int httpStatus) {
|
final URL url,
|
||||||
return new AbfsHttpOperation(url, method, httpStatus);
|
final String method,
|
||||||
|
final int httpStatus) {
|
||||||
|
AbfsHttpOperationWithFixedResult httpOp
|
||||||
|
= new AbfsHttpOperationWithFixedResult(url, method, httpStatus);
|
||||||
|
return httpOp;
|
||||||
}
|
}
|
||||||
|
|
||||||
private AbfsHttpOperation(final URL url, final String method,
|
/**
|
||||||
|
* Constructor for FixedResult instance, avoiding connection init.
|
||||||
|
* @param url request url
|
||||||
|
* @param method Http method
|
||||||
|
* @param httpStatus HttpStatus
|
||||||
|
*/
|
||||||
|
protected AbfsHttpOperation(final URL url,
|
||||||
|
final String method,
|
||||||
final int httpStatus) {
|
final int httpStatus) {
|
||||||
this.isTraceEnabled = LOG.isTraceEnabled();
|
this.isTraceEnabled = LOG.isTraceEnabled();
|
||||||
this.url = url;
|
this.url = url;
|
||||||
|
@ -547,4 +558,24 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
||||||
return this.maskedEncodedUrl;
|
return this.maskedEncodedUrl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation {
|
||||||
|
/**
|
||||||
|
* Creates an instance to represent fixed results.
|
||||||
|
* This is used in idempotency handling.
|
||||||
|
*
|
||||||
|
* @param url The full URL including query string parameters.
|
||||||
|
* @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
|
||||||
|
* @param httpStatus StatusCode to hard set
|
||||||
|
*/
|
||||||
|
public AbfsHttpOperationWithFixedResult(final URL url,
|
||||||
|
final String method,
|
||||||
|
final int httpStatus) {
|
||||||
|
super(url, method, httpStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getResponseHeader(final String httpHeader) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.TestMockHelpers;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -44,11 +46,14 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||||
import static java.net.HttpURLConnection.HTTP_OK;
|
import static java.net.HttpURLConnection.HTTP_OK;
|
||||||
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.doCallRealMethod;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.DeletePath;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDeleted;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDeleted;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
@ -213,6 +218,12 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
this.getConfiguration());
|
this.getConfiguration());
|
||||||
|
|
||||||
// Case 1: Not a retried case should throw error back
|
// Case 1: Not a retried case should throw error back
|
||||||
|
// Add asserts at AzureBlobFileSystemStore and AbfsClient levels
|
||||||
|
intercept(AbfsRestOperationException.class,
|
||||||
|
() -> fs.getAbfsStore().delete(
|
||||||
|
new Path("/NonExistingPath"),
|
||||||
|
false));
|
||||||
|
|
||||||
intercept(AbfsRestOperationException.class,
|
intercept(AbfsRestOperationException.class,
|
||||||
() -> client.deletePath(
|
() -> client.deletePath(
|
||||||
"/NonExistingPath",
|
"/NonExistingPath",
|
||||||
|
@ -223,13 +234,22 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
|
AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
|
||||||
fs.getAbfsStore().getClient(),
|
fs.getAbfsStore().getClient(),
|
||||||
this.getConfiguration());
|
this.getConfiguration());
|
||||||
|
AzureBlobFileSystemStore mockStore = mock(AzureBlobFileSystemStore.class);
|
||||||
|
mockStore = TestMockHelpers.setClassField(AzureBlobFileSystemStore.class, mockStore,
|
||||||
|
"client", mockClient);
|
||||||
|
mockStore = TestMockHelpers.setClassField(AzureBlobFileSystemStore.class,
|
||||||
|
mockStore,
|
||||||
|
"abfsPerfTracker",
|
||||||
|
TestAbfsPerfTracker.getAPerfTrackerInstance(this.getConfiguration()));
|
||||||
|
doCallRealMethod().when(mockStore).delete(new Path("/NonExistingPath"), false);
|
||||||
|
|
||||||
// Case 2: Mimic retried case
|
// Case 2: Mimic retried case
|
||||||
// Idempotency check on Delete always returns success
|
// Idempotency check on Delete always returns success
|
||||||
AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
|
AbfsRestOperation idempotencyRetOp = TestAbfsClient.getRestOp(
|
||||||
AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
|
DeletePath, mockClient, HTTP_METHOD_DELETE,
|
||||||
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
|
TestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
|
||||||
when(idempotencyRetOp.getResult()).thenReturn(http200Op);
|
TestAbfsClient.getTestRequestHeaders(mockClient));
|
||||||
|
idempotencyRetOp.hardSetResult(HTTP_OK);
|
||||||
|
|
||||||
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
|
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
|
||||||
when(mockClient.deletePath("/NonExistingPath", false,
|
when(mockClient.deletePath("/NonExistingPath", false,
|
||||||
|
@ -244,6 +264,9 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||||
.describedAs("Idempotency check reports successful "
|
.describedAs("Idempotency check reports successful "
|
||||||
+ "delete. 200OK should be returned")
|
+ "delete. 200OK should be returned")
|
||||||
.isEqualTo(idempotencyRetOp.getResult().getStatusCode());
|
.isEqualTo(idempotencyRetOp.getResult().getStatusCode());
|
||||||
|
|
||||||
|
// Call from AzureBlobFileSystemStore should not fail either
|
||||||
|
mockStore.delete(new Path("/NonExistingPath"), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.List;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -351,4 +352,49 @@ public final class TestAbfsClient {
|
||||||
field.set(client, fieldObject);
|
field.set(client, fieldObject);
|
||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test helper method to access private createRequestUrl method.
|
||||||
|
* @param client test AbfsClient instace
|
||||||
|
* @param path path to generate Url
|
||||||
|
* @return return store path url
|
||||||
|
* @throws AzureBlobFileSystemException
|
||||||
|
*/
|
||||||
|
public static URL getTestUrl(AbfsClient client, String path) throws
|
||||||
|
AzureBlobFileSystemException {
|
||||||
|
final AbfsUriQueryBuilder abfsUriQueryBuilder
|
||||||
|
= client.createDefaultUriQueryBuilder();
|
||||||
|
return client.createRequestUrl(path, abfsUriQueryBuilder.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test helper method to access private createDefaultHeaders method.
|
||||||
|
* @param client test AbfsClient instance
|
||||||
|
* @return List of AbfsHttpHeaders
|
||||||
|
*/
|
||||||
|
public static List<AbfsHttpHeader> getTestRequestHeaders(AbfsClient client) {
|
||||||
|
return client.createDefaultHeaders();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test helper method to create an AbfsRestOperation instance.
|
||||||
|
* @param type RestOpType
|
||||||
|
* @param client AbfsClient
|
||||||
|
* @param method HttpMethod
|
||||||
|
* @param url Test path url
|
||||||
|
* @param requestHeaders request headers
|
||||||
|
* @return instance of AbfsRestOperation
|
||||||
|
*/
|
||||||
|
public static AbfsRestOperation getRestOp(AbfsRestOperationType type,
|
||||||
|
AbfsClient client,
|
||||||
|
String method,
|
||||||
|
URL url,
|
||||||
|
List<AbfsHttpHeader> requestHeaders) {
|
||||||
|
return new AbfsRestOperation(
|
||||||
|
type,
|
||||||
|
client,
|
||||||
|
method,
|
||||||
|
url,
|
||||||
|
requestHeaders);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,8 @@ import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -405,4 +407,15 @@ public final class TestAbfsPerfTracker {
|
||||||
tracker13.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MIN, TEST_AGGREGATE_COUNT);
|
tracker13.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MIN, TEST_AGGREGATE_COUNT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test helper method to create an AbfsPerfTracker instance.
|
||||||
|
* @param abfsConfig active test abfs config
|
||||||
|
* @return instance of AbfsPerfTracker
|
||||||
|
*/
|
||||||
|
public static AbfsPerfTracker getAPerfTrackerInstance(AbfsConfiguration abfsConfig) {
|
||||||
|
AbfsPerfTracker tracker = new AbfsPerfTracker("test",
|
||||||
|
abfsConfig.getAccountName(), abfsConfig);
|
||||||
|
return tracker;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs.utils;
|
||||||
|
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.lang.reflect.Modifier;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Mock Helpers.
|
||||||
|
*/
|
||||||
|
public final class TestMockHelpers {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets a class field by reflection.
|
||||||
|
* @param type
|
||||||
|
* @param obj
|
||||||
|
* @param fieldName
|
||||||
|
* @param fieldObject
|
||||||
|
* @param <T>
|
||||||
|
* @return
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public static <T> T setClassField(
|
||||||
|
Class<T> type,
|
||||||
|
final T obj,
|
||||||
|
final String fieldName,
|
||||||
|
Object fieldObject) throws Exception {
|
||||||
|
|
||||||
|
Field field = type.getDeclaredField(fieldName);
|
||||||
|
field.setAccessible(true);
|
||||||
|
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||||
|
modifiersField.setAccessible(true);
|
||||||
|
modifiersField.setInt(field,
|
||||||
|
field.getModifiers() & ~Modifier.FINAL);
|
||||||
|
field.set(obj, fieldObject);
|
||||||
|
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TestMockHelpers() {
|
||||||
|
// Not called. - For checkstyle: HideUtilityClassConstructor
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue