HADOOP-17407. ABFS: Fix NPE on delete idempotency flow

- Contributed by Sneha Vijayarajan
This commit is contained in:
Sneha Vijayarajan 2021-01-02 23:52:10 +05:30 committed by GitHub
parent 1cd96e8dd8
commit 5ca1ea89b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 183 additions and 8 deletions

View File

@ -383,6 +383,7 @@ public class AbfsClient implements Closeable {
HttpHeaderConfigurations.LAST_MODIFIED);
if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
LOG.debug("Returning success response from rename idempotency logic");
return destStatusOp;
}
}
@ -450,6 +451,7 @@ public class AbfsClient implements Closeable {
String fileLength = destStatusOp.getResult().getResponseHeader(
HttpHeaderConfigurations.CONTENT_LENGTH);
if (length <= Long.parseLong(fileLength)) {
LOG.debug("Returning success response from append blob idempotency code");
return true;
}
}
@ -627,6 +629,7 @@ public class AbfsClient implements Closeable {
op.getUrl(),
op.getRequestHeaders());
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
LOG.debug("Returning success response from delete idempotency logic");
return successOp;
}

View File

@ -86,12 +86,23 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
private long sendRequestTimeMs;
private long recvResponseTimeMs;
public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(final URL url,
final String method, final int httpStatus) {
return new AbfsHttpOperation(url, method, httpStatus);
public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(
final URL url,
final String method,
final int httpStatus) {
AbfsHttpOperationWithFixedResult httpOp
= new AbfsHttpOperationWithFixedResult(url, method, httpStatus);
return httpOp;
}
private AbfsHttpOperation(final URL url, final String method,
/**
* Constructor for FixedResult instance, avoiding connection init.
* @param url request url
* @param method Http method
* @param httpStatus HttpStatus
*/
protected AbfsHttpOperation(final URL url,
final String method,
final int httpStatus) {
this.isTraceEnabled = LOG.isTraceEnabled();
this.url = url;
@ -547,4 +558,24 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
return this.maskedEncodedUrl;
}
public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation {
/**
* Creates an instance to represent fixed results.
* This is used in idempotency handling.
*
* @param url The full URL including query string parameters.
* @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
* @param httpStatus StatusCode to hard set
*/
public AbfsHttpOperationWithFixedResult(final URL url,
final String method,
final int httpStatus) {
super(url, method, httpStatus);
}
@Override
public String getResponseHeader(final String httpHeader) {
return "";
}
}
}

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.utils.TestMockHelpers;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@ -44,11 +46,14 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.DeletePath;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDeleted;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@ -213,6 +218,12 @@ public class ITestAzureBlobFileSystemDelete extends
this.getConfiguration());
// Case 1: Not a retried case should throw error back
// Add asserts at AzureBlobFileSystemStore and AbfsClient levels
intercept(AbfsRestOperationException.class,
() -> fs.getAbfsStore().delete(
new Path("/NonExistingPath"),
false));
intercept(AbfsRestOperationException.class,
() -> client.deletePath(
"/NonExistingPath",
@ -223,13 +234,22 @@ public class ITestAzureBlobFileSystemDelete extends
AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
fs.getAbfsStore().getClient(),
this.getConfiguration());
AzureBlobFileSystemStore mockStore = mock(AzureBlobFileSystemStore.class);
mockStore = TestMockHelpers.setClassField(AzureBlobFileSystemStore.class, mockStore,
"client", mockClient);
mockStore = TestMockHelpers.setClassField(AzureBlobFileSystemStore.class,
mockStore,
"abfsPerfTracker",
TestAbfsPerfTracker.getAPerfTrackerInstance(this.getConfiguration()));
doCallRealMethod().when(mockStore).delete(new Path("/NonExistingPath"), false);
// Case 2: Mimic retried case
// Idempotency check on Delete always returns success
AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
when(idempotencyRetOp.getResult()).thenReturn(http200Op);
AbfsRestOperation idempotencyRetOp = TestAbfsClient.getRestOp(
DeletePath, mockClient, HTTP_METHOD_DELETE,
TestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
TestAbfsClient.getTestRequestHeaders(mockClient));
idempotencyRetOp.hardSetResult(HTTP_OK);
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
when(mockClient.deletePath("/NonExistingPath", false,
@ -244,6 +264,9 @@ public class ITestAzureBlobFileSystemDelete extends
.describedAs("Idempotency check reports successful "
+ "delete. 200OK should be returned")
.isEqualTo(idempotencyRetOp.getResult().getStatusCode());
// Call from AzureBlobFileSystemStore should not fail either
mockStore.delete(new Path("/NonExistingPath"), false);
}
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.regex.Pattern;
import org.junit.Test;
@ -351,4 +352,49 @@ public final class TestAbfsClient {
field.set(client, fieldObject);
return client;
}
/**
* Test helper method to access private createRequestUrl method.
* @param client test AbfsClient instace
* @param path path to generate Url
* @return return store path url
* @throws AzureBlobFileSystemException
*/
public static URL getTestUrl(AbfsClient client, String path) throws
AzureBlobFileSystemException {
final AbfsUriQueryBuilder abfsUriQueryBuilder
= client.createDefaultUriQueryBuilder();
return client.createRequestUrl(path, abfsUriQueryBuilder.toString());
}
/**
* Test helper method to access private createDefaultHeaders method.
* @param client test AbfsClient instance
* @return List of AbfsHttpHeaders
*/
public static List<AbfsHttpHeader> getTestRequestHeaders(AbfsClient client) {
return client.createDefaultHeaders();
}
/**
* Test helper method to create an AbfsRestOperation instance.
* @param type RestOpType
* @param client AbfsClient
* @param method HttpMethod
* @param url Test path url
* @param requestHeaders request headers
* @return instance of AbfsRestOperation
*/
public static AbfsRestOperation getRestOp(AbfsRestOperationType type,
AbfsClient client,
String method,
URL url,
List<AbfsHttpHeader> requestHeaders) {
return new AbfsRestOperation(
type,
client,
method,
url,
requestHeaders);
}
}

View File

@ -34,6 +34,8 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import static org.assertj.core.api.Assertions.assertThat;
/**
@ -405,4 +407,15 @@ public final class TestAbfsPerfTracker {
tracker13.registerResult(httpOperation).registerSuccess(false).registerAggregates(Instant.MIN, TEST_AGGREGATE_COUNT);
}
}
/**
* Test helper method to create an AbfsPerfTracker instance.
* @param abfsConfig active test abfs config
* @return instance of AbfsPerfTracker
*/
public static AbfsPerfTracker getAPerfTrackerInstance(AbfsConfiguration abfsConfig) {
AbfsPerfTracker tracker = new AbfsPerfTracker("test",
abfsConfig.getAccountName(), abfsConfig);
return tracker;
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.utils;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
/**
* Test Mock Helpers.
*/
public final class TestMockHelpers {
/**
* Sets a class field by reflection.
* @param type
* @param obj
* @param fieldName
* @param fieldObject
* @param <T>
* @return
* @throws Exception
*/
public static <T> T setClassField(
Class<T> type,
final T obj,
final String fieldName,
Object fieldObject) throws Exception {
Field field = type.getDeclaredField(fieldName);
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field,
field.getModifiers() & ~Modifier.FINAL);
field.set(obj, fieldObject);
return obj;
}
private TestMockHelpers() {
// Not called. - For checkstyle: HideUtilityClassConstructor
}
}