HADOOP-17215: Support for conditional overwrite.

Contributed by Sneha Vijayarajan

DETAILS:

    This change adds config key "fs.azure.enable.conditional.create.overwrite" with
    a default of true.  When enabled, if create(path, overwrite: true) is invoked
    and the file exists, the ABFS driver will first obtain its etag and then attempt
    to overwrite the file on the condition that the etag matches. The purpose of this
    is to mitigate the non-idempotency of this method.  Specifically, in the event of
    a network error or similar, the client will retry and this can result in the file
    being created more than once which may result in data loss.  In essense this is
    like a poor man's file handle, and will be addressed more thoroughly in the future
    when support for lease is added to ABFS.

TEST RESULTS:

    namespace.enabled=true
    auth.type=SharedKey
    -------------------
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    Tests run: 87, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 457, Failures: 0, Errors: 0, Skipped: 42
    Tests run: 207, Failures: 0, Errors: 0, Skipped: 24

    namespace.enabled=true
    auth.type=OAuth
    -------------------
    $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify
    Tests run: 87, Failures: 0, Errors: 0, Skipped: 0
    Tests run: 457, Failures: 0, Errors: 0, Skipped: 74
    Tests run: 207, Failures: 0, Errors: 0, Skipped: 140
This commit is contained in:
Sneha Vijayarajan 2020-08-26 00:31:35 +05:30 committed by Thomas Marquardt
parent 0dc54d0247
commit e31a636e92
No known key found for this signature in database
GPG Key ID: AEB30C9E78868287
10 changed files with 539 additions and 47 deletions

View File

@ -181,6 +181,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
private String azureAtomicDirs; private String azureAtomicDirs;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE,
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE)
private boolean enableConditionalCreateOverwrite;
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY, @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES) DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
private String azureAppendBlobDirs; private String azureAppendBlobDirs;
@ -573,6 +577,10 @@ public class AbfsConfiguration{
return this.azureAtomicDirs; return this.azureAtomicDirs;
} }
public boolean isConditionalCreateOverwriteEnabled() {
return this.enableConditionalCreateOverwrite;
}
public String getAppendBlobDirs() { public String getAppendBlobDirs() {
return this.azureAppendBlobDirs; return this.azureAppendBlobDirs;
} }

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
@ -464,10 +465,32 @@ public class AzureBlobFileSystemStore implements Closeable {
isAppendBlob = true; isAppendBlob = true;
} }
final AbfsRestOperation op = client.createPath(relativePath, true, overwrite, // if "fs.azure.enable.conditional.create.overwrite" is enabled and
isNamespaceEnabled ? getOctalNotation(permission) : null, // is a create request with overwrite=true, create will follow different
isNamespaceEnabled ? getOctalNotation(umask) : null, // flow.
isAppendBlob); boolean triggerConditionalCreateOverwrite = false;
if (overwrite
&& abfsConfiguration.isConditionalCreateOverwriteEnabled()) {
triggerConditionalCreateOverwrite = true;
}
AbfsRestOperation op;
if (triggerConditionalCreateOverwrite) {
op = conditionalCreateOverwriteFile(relativePath,
statistics,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null,
isAppendBlob
);
} else {
op = client.createPath(relativePath, true,
overwrite,
isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null,
isAppendBlob,
null);
}
perfInfo.registerResult(op.getResult()).registerSuccess(true); perfInfo.registerResult(op.getResult()).registerSuccess(true);
return new AbfsOutputStream( return new AbfsOutputStream(
@ -479,6 +502,74 @@ public class AzureBlobFileSystemStore implements Closeable {
} }
} }
/**
* Conditional create overwrite flow ensures that create overwrites is done
* only if there is match for eTag of existing file.
* @param relativePath
* @param statistics
* @param permission
* @param umask
* @param isAppendBlob
* @return
* @throws AzureBlobFileSystemException
*/
private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath,
final FileSystem.Statistics statistics,
final String permission,
final String umask,
final boolean isAppendBlob) throws AzureBlobFileSystemException {
AbfsRestOperation op;
try {
// Trigger a create with overwrite=false first so that eTag fetch can be
// avoided for cases when no pre-existing file is present (major portion
// of create file traffic falls into the case of no pre-existing file).
op = client.createPath(relativePath, true,
false, permission, umask, isAppendBlob, null);
} catch (AbfsRestOperationException e) {
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
// File pre-exists, fetch eTag
try {
op = client.getPathStatus(relativePath, false);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
// Is a parallel access case, as file which was found to be
// present went missing by this request.
throw new ConcurrentWriteOperationDetectedException(
"Parallel access to the create path detected. Failing request "
+ "to honor single writer semantics");
} else {
throw ex;
}
}
String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
try {
// overwrite only if eTag matches with the file properties fetched befpre
op = client.createPath(relativePath, true,
true, permission, umask, isAppendBlob, eTag);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
// Is a parallel access case, as file with eTag was just queried
// and precondition failure can happen only when another file with
// different etag got created.
throw new ConcurrentWriteOperationDetectedException(
"Parallel access to the create path detected. Failing request "
+ "to honor single writer semantics");
} else {
throw ex;
}
}
} else {
throw e;
}
}
return op;
}
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) { private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
int bufferSize = abfsConfiguration.getWriteBufferSize(); int bufferSize = abfsConfiguration.getWriteBufferSize();
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) { if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
@ -508,7 +599,7 @@ public class AzureBlobFileSystemStore implements Closeable {
final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true, final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true,
isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(permission) : null,
isNamespaceEnabled ? getOctalNotation(umask) : null, false); isNamespaceEnabled ? getOctalNotation(umask) : null, false, null);
perfInfo.registerResult(op.getResult()).registerSuccess(true); perfInfo.registerResult(op.getResult()).registerSuccess(true);
} }
} }

View File

@ -67,6 +67,10 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
/** This config ensures that during create overwrite an existing file will be
* overwritten only if there is a match on the eTag of existing file.
*/
public static final String FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = "fs.azure.enable.conditional.create.overwrite";
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created /** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
* Default is empty. **/ * Default is empty. **/
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories"; public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";

View File

@ -70,6 +70,7 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = ""; public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
/**
* Thrown when a concurrent write operation is detected.
*/
@org.apache.hadoop.classification.InterfaceAudience.Public
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class ConcurrentWriteOperationDetectedException
extends AzureBlobFileSystemException {
public ConcurrentWriteOperationDetectedException(String message) {
super(message);
}
}

View File

@ -265,7 +265,7 @@ public class AbfsClient implements Closeable {
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite, public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
final String permission, final String umask, final String permission, final String umask,
final boolean isAppendBlob) throws AzureBlobFileSystemException { final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
if (!overwrite) { if (!overwrite) {
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR)); requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
@ -279,6 +279,10 @@ public class AbfsClient implements Closeable {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask)); requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask));
} }
if (eTag != null && !eTag.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
if (isAppendBlob) { if (isAppendBlob) {

View File

@ -110,9 +110,18 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
connectionsMade++; connectionsMade++;
requestsSent++; requestsSent++;
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
sendRequestPath)) { sendRequestPath)) {
// Is a file overwrite case
long createRequestCalls = 1;
long createTriggeredGFSForETag = 0;
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
createRequestCalls += 1;
createTriggeredGFSForETag = 1;
}
for (int i = 0; i < LARGE_OPERATIONS; i++) { for (int i = 0; i < LARGE_OPERATIONS; i++) {
out.write(testNetworkStatsString.getBytes()); out.write(testNetworkStatsString.getBytes());
@ -141,17 +150,20 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
* wrote each time). * wrote each time).
* *
*/ */
connectionsMade += createRequestCalls + createTriggeredGFSForETag;
requestsSent += createRequestCalls;
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) { if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
// no network calls are made for hflush in case of appendblob // no network calls are made for hflush in case of appendblob
assertAbfsStatistics(CONNECTIONS_MADE, assertAbfsStatistics(CONNECTIONS_MADE,
connectionsMade + 1 + LARGE_OPERATIONS, metricMap); connectionsMade + LARGE_OPERATIONS, metricMap);
assertAbfsStatistics(SEND_REQUESTS, assertAbfsStatistics(SEND_REQUESTS,
requestsSent + 1 + LARGE_OPERATIONS, metricMap); requestsSent + LARGE_OPERATIONS, metricMap);
} else { } else {
assertAbfsStatistics(CONNECTIONS_MADE, assertAbfsStatistics(CONNECTIONS_MADE,
connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap); connectionsMade + LARGE_OPERATIONS * 2, metricMap);
assertAbfsStatistics(SEND_REQUESTS, assertAbfsStatistics(SEND_REQUESTS,
requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap); requestsSent + LARGE_OPERATIONS * 2, metricMap);
} }
assertAbfsStatistics(AbfsStatistic.BYTES_SENT, assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length), bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
@ -237,13 +249,21 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
try { try {
/* /*
* Creating a file and writing buffer into it. Also recording the * Creating a file and writing buffer into it.
* buffer for future read() call. * This is a file recreate, so it will trigger
* 2 extra calls if create overwrite is off by default.
* Also recording the buffer for future read() call.
* This creating outputStream and writing requires 2 * * This creating outputStream and writing requires 2 *
* (LARGE_OPERATIONS) get requests. * (LARGE_OPERATIONS) get requests.
*/ */
StringBuilder largeBuffer = new StringBuilder(); StringBuilder largeBuffer = new StringBuilder();
out = fs.create(getResponsePath); out = fs.create(getResponsePath);
long createRequestCalls = 1;
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
createRequestCalls += 2;
}
for (int i = 0; i < LARGE_OPERATIONS; i++) { for (int i = 0; i < LARGE_OPERATIONS; i++) {
out.write(testResponseString.getBytes()); out.write(testResponseString.getBytes());
out.hflush(); out.hflush();
@ -268,7 +288,8 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
* *
* get_response : get_responses(Last assertion) + 1 * get_response : get_responses(Last assertion) + 1
* (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
* LARGE_OPERATIONS times) + 1(open()) + 1(read()). * LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
* 1 (createOverwriteTriggeredGetForeTag).
* *
* bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS * * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
* bytes wrote each time (bytes_received is equal to bytes wrote in the * bytes wrote each time (bytes_received is equal to bytes wrote in the
@ -284,7 +305,8 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
getResponses + 3 + LARGE_OPERATIONS, metricMap); getResponses + 3 + LARGE_OPERATIONS, metricMap);
} else { } else {
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap); getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
metricMap);
} }
} finally { } finally {
@ -319,4 +341,4 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
IOUtils.cleanupWithLogger(LOG, out); IOUtils.cleanupWithLogger(LOG, out);
} }
} }
} }

View File

@ -21,18 +21,44 @@ package org.apache.hadoop.fs.azurebfs;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FilterOutputStream; import java.io.FilterOutputStream;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.UUID;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
/** /**
* Test create operation. * Test create operation.
@ -188,4 +214,250 @@ public class ITestAzureBlobFileSystemCreate extends
}); });
} }
/**
* Tests if the number of connections made for:
* 1. create overwrite=false of a file that doesnt pre-exist
* 2. create overwrite=false of a file that pre-exists
* 3. create overwrite=true of a file that doesnt pre-exist
* 4. create overwrite=true of a file that pre-exists
* matches the expectation when run against both combinations of
* fs.azure.enable.conditional.create.overwrite=true and
* fs.azure.enable.conditional.create.overwrite=false
* @throws Throwable
*/
@Test
public void testDefaultCreateOverwriteFileTest() throws Throwable {
testCreateFileOverwrite(true);
testCreateFileOverwrite(false);
}
public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
throws Throwable {
final AzureBlobFileSystem currentFs = getFileSystem();
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite",
Boolean.toString(enableConditionalCreateOverwrite));
final AzureBlobFileSystem fs =
(AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
config);
long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
.get(CONNECTIONS_MADE.getStatName());
int createRequestCount = 0;
final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_"
+ UUID.randomUUID().toString());
// Case 1: Not Overwrite - File does not pre-exist
// create should be successful
fs.create(nonOverwriteFile, false);
// One request to server to create path should be issued
createRequestCount++;
assertAbfsStatistics(
CONNECTIONS_MADE,
totalConnectionMadeBeforeTest + createRequestCount,
fs.getInstrumentationMap());
// Case 2: Not Overwrite - File pre-exists
intercept(FileAlreadyExistsException.class,
() -> fs.create(nonOverwriteFile, false));
// One request to server to create path should be issued
createRequestCount++;
assertAbfsStatistics(
CONNECTIONS_MADE,
totalConnectionMadeBeforeTest + createRequestCount,
fs.getInstrumentationMap());
final Path overwriteFilePath = new Path("/OverwriteTest_FileName_"
+ UUID.randomUUID().toString());
// Case 3: Overwrite - File does not pre-exist
// create should be successful
fs.create(overwriteFilePath, true);
// One request to server to create path should be issued
createRequestCount++;
assertAbfsStatistics(
CONNECTIONS_MADE,
totalConnectionMadeBeforeTest + createRequestCount,
fs.getInstrumentationMap());
// Case 4: Overwrite - File pre-exists
fs.create(overwriteFilePath, true);
if (enableConditionalCreateOverwrite) {
// Three requests will be sent to server to create path,
// 1. create without overwrite
// 2. GetFileStatus to get eTag
// 3. create with overwrite
createRequestCount += 3;
} else {
createRequestCount++;
}
assertAbfsStatistics(
CONNECTIONS_MADE,
totalConnectionMadeBeforeTest + createRequestCount,
fs.getInstrumentationMap());
}
/**
* Test negative scenarios with Create overwrite=false as default
* With create overwrite=true ending in 3 calls:
* A. Create overwrite=false
* B. GFS
* C. Create overwrite=true
*
* Scn1: A fails with HTTP409, leading to B which fails with HTTP404,
* detect parallel access
* Scn2: A fails with HTTP409, leading to B which fails with HTTP500,
* fail create with HTTP500
* Scn3: A fails with HTTP409, leading to B and then C,
* which fails with HTTP412, detect parallel access
* Scn4: A fails with HTTP409, leading to B and then C,
* which fails with HTTP500, fail create with HTTP500
* Scn5: A fails with HTTP500, fail create with HTTP500
*/
@Test
public void testNegativeScenariosForCreateOverwriteDisabled()
throws Throwable {
final AzureBlobFileSystem currentFs = getFileSystem();
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite",
Boolean.toString(true));
final AzureBlobFileSystem fs =
(AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
config);
// Get mock AbfsClient with current config
AbfsClient
mockClient
= TestAbfsClient.getMockAbfsClient(
fs.getAbfsStore().getClient(),
fs.getAbfsStore().getAbfsConfiguration());
AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
abfsStore = setAzureBlobSystemStoreField(abfsStore, "client", mockClient);
AbfsRestOperation successOp = mock(
AbfsRestOperation.class);
AbfsHttpOperation http200Op = mock(
AbfsHttpOperation.class);
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
when(successOp.getResult()).thenReturn(http200Op);
AbfsRestOperationException conflictResponseEx
= getMockAbfsRestOperationException(HTTP_CONFLICT);
AbfsRestOperationException serverErrorResponseEx
= getMockAbfsRestOperationException(HTTP_INTERNAL_ERROR);
AbfsRestOperationException fileNotFoundResponseEx
= getMockAbfsRestOperationException(HTTP_NOT_FOUND);
AbfsRestOperationException preConditionResponseEx
= getMockAbfsRestOperationException(HTTP_PRECON_FAILED);
doThrow(conflictResponseEx) // Scn1: GFS fails with Http404
.doThrow(conflictResponseEx) // Scn2: GFS fails with Http500
.doThrow(
conflictResponseEx) // Scn3: create overwrite=true fails with Http412
.doThrow(
conflictResponseEx) // Scn4: create overwrite=true fails with Http500
.doThrow(
serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500
.when(mockClient)
.createPath(any(String.class), eq(true), eq(false), any(String.class),
any(String.class), any(boolean.class), eq(null));
doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404
.doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500
.doReturn(successOp) // Scn3: create overwrite=true fails with Http412
.doReturn(successOp) // Scn4: create overwrite=true fails with Http500
.when(mockClient)
.getPathStatus(any(String.class), eq(false));
doThrow(
preConditionResponseEx) // Scn3: create overwrite=true fails with Http412
.doThrow(
serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500
.when(mockClient)
.createPath(any(String.class), eq(true), eq(true), any(String.class),
any(String.class), any(boolean.class), eq(null));
// Scn1: GFS fails with Http404
// Sequence of events expected:
// 1. create overwrite=false - fail with conflict
// 2. GFS - fail with File Not found
// Create will fail with ConcurrentWriteOperationDetectedException
validateCreateFileException(ConcurrentWriteOperationDetectedException.class,
abfsStore);
// Scn2: GFS fails with Http500
// Sequence of events expected:
// 1. create overwrite=false - fail with conflict
// 2. GFS - fail with Server error
// Create will fail with 500
validateCreateFileException(AbfsRestOperationException.class, abfsStore);
// Scn3: create overwrite=true fails with Http412
// Sequence of events expected:
// 1. create overwrite=false - fail with conflict
// 2. GFS - pass
// 3. create overwrite=true - fail with Pre-Condition
// Create will fail with ConcurrentWriteOperationDetectedException
validateCreateFileException(ConcurrentWriteOperationDetectedException.class,
abfsStore);
// Scn4: create overwrite=true fails with Http500
// Sequence of events expected:
// 1. create overwrite=false - fail with conflict
// 2. GFS - pass
// 3. create overwrite=true - fail with Server error
// Create will fail with 500
validateCreateFileException(AbfsRestOperationException.class, abfsStore);
// Scn5: create overwrite=false fails with Http500
// Sequence of events expected:
// 1. create overwrite=false - fail with server error
// Create will fail with 500
validateCreateFileException(AbfsRestOperationException.class, abfsStore);
}
private AzureBlobFileSystemStore setAzureBlobSystemStoreField(
final AzureBlobFileSystemStore abfsStore,
final String fieldName,
Object fieldObject) throws Exception {
Field abfsClientField = AzureBlobFileSystemStore.class.getDeclaredField(
fieldName);
abfsClientField.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(abfsClientField,
abfsClientField.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
abfsClientField.set(abfsStore, fieldObject);
return abfsStore;
}
private <E extends Throwable> void validateCreateFileException(final Class<E> exceptionClass, final AzureBlobFileSystemStore abfsStore)
throws Exception {
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
FsAction.ALL);
FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE,
FsAction.NONE);
Path testPath = new Path("testFile");
intercept(
exceptionClass,
() -> abfsStore.createFile(testPath, null, true, permission, umask));
}
private AbfsRestOperationException getMockAbfsRestOperationException(int status) {
return new AbfsRestOperationException(status, "", "", new Exception());
}
} }

View File

@ -18,12 +18,18 @@
package org.apache.hadoop.fs.azurebfs; package org.apache.hadoop.fs.azurebfs;
import java.util.UUID;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
/** /**
* Test mkdir operation. * Test mkdir operation.
*/ */
@ -45,4 +51,58 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
public void testCreateRoot() throws Exception { public void testCreateRoot() throws Exception {
assertMkdirs(getFileSystem(), new Path("/")); assertMkdirs(getFileSystem(), new Path("/"));
} }
/**
* Test mkdir for possible values of fs.azure.disable.default.create.overwrite
* @throws Exception
*/
@Test
public void testDefaultCreateOverwriteDirTest() throws Throwable {
// the config fs.azure.disable.default.create.overwrite should have no
// effect on mkdirs
testCreateDirOverwrite(true);
testCreateDirOverwrite(false);
}
public void testCreateDirOverwrite(boolean enableConditionalCreateOverwrite)
throws Throwable {
final AzureBlobFileSystem currentFs = getFileSystem();
Configuration config = new Configuration(this.getRawConfiguration());
config.set("fs.azure.enable.conditional.create.overwrite",
Boolean.toString(enableConditionalCreateOverwrite));
final AzureBlobFileSystem fs =
(AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
config);
long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
.get(CONNECTIONS_MADE.getStatName());
int mkdirRequestCount = 0;
final Path dirPath = new Path("/DirPath_"
+ UUID.randomUUID().toString());
// Case 1: Dir does not pre-exist
fs.mkdirs(dirPath);
// One request to server
mkdirRequestCount++;
assertAbfsStatistics(
CONNECTIONS_MADE,
totalConnectionMadeBeforeTest + mkdirRequestCount,
fs.getInstrumentationMap());
// Case 2: Dir pre-exists
// Mkdir on existing Dir path will not lead to failure
fs.mkdirs(dirPath);
// One request to server
mkdirRequestCount++;
assertAbfsStatistics(
CONNECTIONS_MADE,
totalConnectionMadeBeforeTest + mkdirRequestCount,
fs.getInstrumentationMap());
}
} }

View File

@ -283,8 +283,7 @@ public final class TestAbfsClient {
} }
public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
AbfsConfiguration abfsConfig) AbfsConfiguration abfsConfig) throws Exception {
throws IOException, NoSuchFieldException, IllegalAccessException {
AuthType currentAuthType = abfsConfig.getAuthType( AuthType currentAuthType = abfsConfig.getAuthType(
abfsConfig.getAccountName()); abfsConfig.getAccountName());
@ -310,47 +309,46 @@ public final class TestAbfsClient {
when(client.createDefaultHeaders()).thenCallRealMethod(); when(client.createDefaultHeaders()).thenCallRealMethod();
// override baseurl // override baseurl
Field baseUrlField = AbfsClient.class.getDeclaredField("baseUrl"); client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
baseUrlField.setAccessible(true); abfsConfig);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true); // override baseurl
modifiersField.setInt(baseUrlField, baseUrlField.getModifiers() & ~java.lang.reflect.Modifier.FINAL); client = TestAbfsClient.setAbfsClientField(client, "baseUrl",
baseUrlField.set(client, baseAbfsClientInstance.getBaseUrl()); baseAbfsClientInstance.getBaseUrl());
// override auth provider // override auth provider
if (currentAuthType == AuthType.SharedKey) { if (currentAuthType == AuthType.SharedKey) {
Field sharedKeyCredsField = AbfsClient.class.getDeclaredField( client = TestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials",
"sharedKeyCredentials"); new SharedKeyCredentials(
sharedKeyCredsField.setAccessible(true); abfsConfig.getAccountName().substring(0,
modifiersField.setInt(sharedKeyCredsField, abfsConfig.getAccountName().indexOf(DOT)),
sharedKeyCredsField.getModifiers() abfsConfig.getStorageAccountKey()));
& ~java.lang.reflect.Modifier.FINAL);
sharedKeyCredsField.set(client, new SharedKeyCredentials(
abfsConfig.getAccountName().substring(0,
abfsConfig.getAccountName().indexOf(DOT)),
abfsConfig.getStorageAccountKey()));
} else { } else {
Field tokenProviderField = AbfsClient.class.getDeclaredField( client = TestAbfsClient.setAbfsClientField(client, "tokenProvider",
"tokenProvider"); abfsConfig.getTokenProvider());
tokenProviderField.setAccessible(true);
modifiersField.setInt(tokenProviderField,
tokenProviderField.getModifiers()
& ~java.lang.reflect.Modifier.FINAL);
tokenProviderField.set(client, abfsConfig.getTokenProvider());
} }
// override user agent // override user agent
String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild " String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild "
+ "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; " + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
+ "UNKNOWN/UNKNOWN) MSFT"; + "UNKNOWN/UNKNOWN) MSFT";
Field userAgentField = AbfsClient.class.getDeclaredField( client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent);
"userAgent");
userAgentField.setAccessible(true);
modifiersField.setInt(userAgentField,
userAgentField.getModifiers()
& ~java.lang.reflect.Modifier.FINAL);
userAgentField.set(client, userAgent);
return client; return client;
} }
private static AbfsClient setAbfsClientField(
final AbfsClient client,
final String fieldName,
Object fieldObject) throws Exception {
Field field = AbfsClient.class.getDeclaredField(fieldName);
field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field,
field.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
field.set(client, fieldObject);
return client;
}
} }