HADOOP-17215: Support for conditional overwrite.
Contributed by Sneha Vijayarajan DETAILS: This change adds config key "fs.azure.enable.conditional.create.overwrite" with a default of true. When enabled, if create(path, overwrite: true) is invoked and the file exists, the ABFS driver will first obtain its etag and then attempt to overwrite the file on the condition that the etag matches. The purpose of this is to mitigate the non-idempotency of this method. Specifically, in the event of a network error or similar, the client will retry and this can result in the file being created more than once which may result in data loss. In essense this is like a poor man's file handle, and will be addressed more thoroughly in the future when support for lease is added to ABFS. TEST RESULTS: namespace.enabled=true auth.type=SharedKey ------------------- $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify Tests run: 87, Failures: 0, Errors: 0, Skipped: 0 Tests run: 457, Failures: 0, Errors: 0, Skipped: 42 Tests run: 207, Failures: 0, Errors: 0, Skipped: 24 namespace.enabled=true auth.type=OAuth ------------------- $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify Tests run: 87, Failures: 0, Errors: 0, Skipped: 0 Tests run: 457, Failures: 0, Errors: 0, Skipped: 74 Tests run: 207, Failures: 0, Errors: 0, Skipped: 140
This commit is contained in:
parent
f208da286c
commit
d166420302
|
@ -181,6 +181,10 @@ public class AbfsConfiguration{
|
|||
DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
|
||||
private String azureAtomicDirs;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE,
|
||||
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE)
|
||||
private boolean enableConditionalCreateOverwrite;
|
||||
|
||||
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY,
|
||||
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
|
||||
private String azureAppendBlobDirs;
|
||||
|
@ -573,6 +577,10 @@ public class AbfsConfiguration{
|
|||
return this.azureAtomicDirs;
|
||||
}
|
||||
|
||||
public boolean isConditionalCreateOverwriteEnabled() {
|
||||
return this.enableConditionalCreateOverwrite;
|
||||
}
|
||||
|
||||
public String getAppendBlobDirs() {
|
||||
return this.azureAppendBlobDirs;
|
||||
}
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
|
|||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException;
|
||||
|
@ -464,10 +465,32 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
isAppendBlob = true;
|
||||
}
|
||||
|
||||
final AbfsRestOperation op = client.createPath(relativePath, true, overwrite,
|
||||
isNamespaceEnabled ? getOctalNotation(permission) : null,
|
||||
isNamespaceEnabled ? getOctalNotation(umask) : null,
|
||||
isAppendBlob);
|
||||
// if "fs.azure.enable.conditional.create.overwrite" is enabled and
|
||||
// is a create request with overwrite=true, create will follow different
|
||||
// flow.
|
||||
boolean triggerConditionalCreateOverwrite = false;
|
||||
if (overwrite
|
||||
&& abfsConfiguration.isConditionalCreateOverwriteEnabled()) {
|
||||
triggerConditionalCreateOverwrite = true;
|
||||
}
|
||||
|
||||
AbfsRestOperation op;
|
||||
if (triggerConditionalCreateOverwrite) {
|
||||
op = conditionalCreateOverwriteFile(relativePath,
|
||||
statistics,
|
||||
isNamespaceEnabled ? getOctalNotation(permission) : null,
|
||||
isNamespaceEnabled ? getOctalNotation(umask) : null,
|
||||
isAppendBlob
|
||||
);
|
||||
|
||||
} else {
|
||||
op = client.createPath(relativePath, true,
|
||||
overwrite,
|
||||
isNamespaceEnabled ? getOctalNotation(permission) : null,
|
||||
isNamespaceEnabled ? getOctalNotation(umask) : null,
|
||||
isAppendBlob,
|
||||
null);
|
||||
}
|
||||
perfInfo.registerResult(op.getResult()).registerSuccess(true);
|
||||
|
||||
return new AbfsOutputStream(
|
||||
|
@ -479,6 +502,74 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Conditional create overwrite flow ensures that create overwrites is done
|
||||
* only if there is match for eTag of existing file.
|
||||
* @param relativePath
|
||||
* @param statistics
|
||||
* @param permission
|
||||
* @param umask
|
||||
* @param isAppendBlob
|
||||
* @return
|
||||
* @throws AzureBlobFileSystemException
|
||||
*/
|
||||
private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath,
|
||||
final FileSystem.Statistics statistics,
|
||||
final String permission,
|
||||
final String umask,
|
||||
final boolean isAppendBlob) throws AzureBlobFileSystemException {
|
||||
AbfsRestOperation op;
|
||||
|
||||
try {
|
||||
// Trigger a create with overwrite=false first so that eTag fetch can be
|
||||
// avoided for cases when no pre-existing file is present (major portion
|
||||
// of create file traffic falls into the case of no pre-existing file).
|
||||
op = client.createPath(relativePath, true,
|
||||
false, permission, umask, isAppendBlob, null);
|
||||
} catch (AbfsRestOperationException e) {
|
||||
if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
|
||||
// File pre-exists, fetch eTag
|
||||
try {
|
||||
op = client.getPathStatus(relativePath, false);
|
||||
} catch (AbfsRestOperationException ex) {
|
||||
if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
|
||||
// Is a parallel access case, as file which was found to be
|
||||
// present went missing by this request.
|
||||
throw new ConcurrentWriteOperationDetectedException(
|
||||
"Parallel access to the create path detected. Failing request "
|
||||
+ "to honor single writer semantics");
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
String eTag = op.getResult()
|
||||
.getResponseHeader(HttpHeaderConfigurations.ETAG);
|
||||
|
||||
try {
|
||||
// overwrite only if eTag matches with the file properties fetched befpre
|
||||
op = client.createPath(relativePath, true,
|
||||
true, permission, umask, isAppendBlob, eTag);
|
||||
} catch (AbfsRestOperationException ex) {
|
||||
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
|
||||
// Is a parallel access case, as file with eTag was just queried
|
||||
// and precondition failure can happen only when another file with
|
||||
// different etag got created.
|
||||
throw new ConcurrentWriteOperationDetectedException(
|
||||
"Parallel access to the create path detected. Failing request "
|
||||
+ "to honor single writer semantics");
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
return op;
|
||||
}
|
||||
|
||||
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
|
||||
int bufferSize = abfsConfiguration.getWriteBufferSize();
|
||||
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
|
||||
|
@ -508,7 +599,7 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
|
||||
final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true,
|
||||
isNamespaceEnabled ? getOctalNotation(permission) : null,
|
||||
isNamespaceEnabled ? getOctalNotation(umask) : null, false);
|
||||
isNamespaceEnabled ? getOctalNotation(umask) : null, false, null);
|
||||
perfInfo.registerResult(op.getResult()).registerSuccess(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,6 +67,10 @@ public final class ConfigurationKeys {
|
|||
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
|
||||
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
|
||||
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
|
||||
/** This config ensures that during create overwrite an existing file will be
|
||||
* overwritten only if there is a match on the eTag of existing file.
|
||||
*/
|
||||
public static final String FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = "fs.azure.enable.conditional.create.overwrite";
|
||||
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
|
||||
* Default is empty. **/
|
||||
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
|
||||
|
|
|
@ -70,6 +70,7 @@ public final class FileSystemConfigurations {
|
|||
public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false;
|
||||
|
||||
public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase";
|
||||
public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
|
||||
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
|
||||
|
||||
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
|
||||
|
||||
/**
|
||||
* Thrown when a concurrent write operation is detected.
|
||||
*/
|
||||
@org.apache.hadoop.classification.InterfaceAudience.Public
|
||||
@org.apache.hadoop.classification.InterfaceStability.Evolving
|
||||
public class ConcurrentWriteOperationDetectedException
|
||||
extends AzureBlobFileSystemException {
|
||||
|
||||
public ConcurrentWriteOperationDetectedException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
|
@ -265,7 +265,7 @@ public class AbfsClient implements Closeable {
|
|||
|
||||
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
|
||||
final String permission, final String umask,
|
||||
final boolean isAppendBlob) throws AzureBlobFileSystemException {
|
||||
final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
if (!overwrite) {
|
||||
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
|
||||
|
@ -279,6 +279,10 @@ public class AbfsClient implements Closeable {
|
|||
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask));
|
||||
}
|
||||
|
||||
if (eTag != null && !eTag.isEmpty()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
|
||||
}
|
||||
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
|
||||
if (isAppendBlob) {
|
||||
|
|
|
@ -110,9 +110,18 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|||
connectionsMade++;
|
||||
requestsSent++;
|
||||
|
||||
|
||||
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
|
||||
sendRequestPath)) {
|
||||
|
||||
// Is a file overwrite case
|
||||
long createRequestCalls = 1;
|
||||
long createTriggeredGFSForETag = 0;
|
||||
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
|
||||
createRequestCalls += 1;
|
||||
createTriggeredGFSForETag = 1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < LARGE_OPERATIONS; i++) {
|
||||
out.write(testNetworkStatsString.getBytes());
|
||||
|
||||
|
@ -141,17 +150,20 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|||
* wrote each time).
|
||||
*
|
||||
*/
|
||||
|
||||
connectionsMade += createRequestCalls + createTriggeredGFSForETag;
|
||||
requestsSent += createRequestCalls;
|
||||
if (fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())) {
|
||||
// no network calls are made for hflush in case of appendblob
|
||||
assertAbfsStatistics(CONNECTIONS_MADE,
|
||||
connectionsMade + 1 + LARGE_OPERATIONS, metricMap);
|
||||
connectionsMade + LARGE_OPERATIONS, metricMap);
|
||||
assertAbfsStatistics(SEND_REQUESTS,
|
||||
requestsSent + 1 + LARGE_OPERATIONS, metricMap);
|
||||
requestsSent + LARGE_OPERATIONS, metricMap);
|
||||
} else {
|
||||
assertAbfsStatistics(CONNECTIONS_MADE,
|
||||
connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
|
||||
connectionsMade + LARGE_OPERATIONS * 2, metricMap);
|
||||
assertAbfsStatistics(SEND_REQUESTS,
|
||||
requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
|
||||
requestsSent + LARGE_OPERATIONS * 2, metricMap);
|
||||
}
|
||||
assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
|
||||
bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
|
||||
|
@ -237,13 +249,21 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|||
try {
|
||||
|
||||
/*
|
||||
* Creating a file and writing buffer into it. Also recording the
|
||||
* buffer for future read() call.
|
||||
* Creating a file and writing buffer into it.
|
||||
* This is a file recreate, so it will trigger
|
||||
* 2 extra calls if create overwrite is off by default.
|
||||
* Also recording the buffer for future read() call.
|
||||
* This creating outputStream and writing requires 2 *
|
||||
* (LARGE_OPERATIONS) get requests.
|
||||
*/
|
||||
StringBuilder largeBuffer = new StringBuilder();
|
||||
out = fs.create(getResponsePath);
|
||||
|
||||
long createRequestCalls = 1;
|
||||
if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) {
|
||||
createRequestCalls += 2;
|
||||
}
|
||||
|
||||
for (int i = 0; i < LARGE_OPERATIONS; i++) {
|
||||
out.write(testResponseString.getBytes());
|
||||
out.hflush();
|
||||
|
@ -268,7 +288,8 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|||
*
|
||||
* get_response : get_responses(Last assertion) + 1
|
||||
* (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
|
||||
* LARGE_OPERATIONS times) + 1(open()) + 1(read()).
|
||||
* LARGE_OPERATIONS times) + 1(open()) + 1(read()) +
|
||||
* 1 (createOverwriteTriggeredGetForeTag).
|
||||
*
|
||||
* bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
|
||||
* bytes wrote each time (bytes_received is equal to bytes wrote in the
|
||||
|
@ -284,7 +305,8 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|||
getResponses + 3 + LARGE_OPERATIONS, metricMap);
|
||||
} else {
|
||||
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
|
||||
getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
|
||||
getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS,
|
||||
metricMap);
|
||||
}
|
||||
|
||||
} finally {
|
||||
|
@ -319,4 +341,4 @@ public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
|||
IOUtils.cleanupWithLogger(LOG, out);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,18 +21,44 @@ package org.apache.hadoop.fs.azurebfs;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.FilterOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.EnumSet;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_CONFLICT;
|
||||
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
|
||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
||||
|
||||
/**
|
||||
* Test create operation.
|
||||
|
@ -188,4 +214,250 @@ public class ITestAzureBlobFileSystemCreate extends
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if the number of connections made for:
|
||||
* 1. create overwrite=false of a file that doesnt pre-exist
|
||||
* 2. create overwrite=false of a file that pre-exists
|
||||
* 3. create overwrite=true of a file that doesnt pre-exist
|
||||
* 4. create overwrite=true of a file that pre-exists
|
||||
* matches the expectation when run against both combinations of
|
||||
* fs.azure.enable.conditional.create.overwrite=true and
|
||||
* fs.azure.enable.conditional.create.overwrite=false
|
||||
* @throws Throwable
|
||||
*/
|
||||
@Test
|
||||
public void testDefaultCreateOverwriteFileTest() throws Throwable {
|
||||
testCreateFileOverwrite(true);
|
||||
testCreateFileOverwrite(false);
|
||||
}
|
||||
|
||||
public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
|
||||
throws Throwable {
|
||||
final AzureBlobFileSystem currentFs = getFileSystem();
|
||||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
config.set("fs.azure.enable.conditional.create.overwrite",
|
||||
Boolean.toString(enableConditionalCreateOverwrite));
|
||||
|
||||
final AzureBlobFileSystem fs =
|
||||
(AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
|
||||
config);
|
||||
|
||||
long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
|
||||
.get(CONNECTIONS_MADE.getStatName());
|
||||
|
||||
int createRequestCount = 0;
|
||||
final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_"
|
||||
+ UUID.randomUUID().toString());
|
||||
|
||||
// Case 1: Not Overwrite - File does not pre-exist
|
||||
// create should be successful
|
||||
fs.create(nonOverwriteFile, false);
|
||||
|
||||
// One request to server to create path should be issued
|
||||
createRequestCount++;
|
||||
|
||||
assertAbfsStatistics(
|
||||
CONNECTIONS_MADE,
|
||||
totalConnectionMadeBeforeTest + createRequestCount,
|
||||
fs.getInstrumentationMap());
|
||||
|
||||
// Case 2: Not Overwrite - File pre-exists
|
||||
intercept(FileAlreadyExistsException.class,
|
||||
() -> fs.create(nonOverwriteFile, false));
|
||||
|
||||
// One request to server to create path should be issued
|
||||
createRequestCount++;
|
||||
|
||||
assertAbfsStatistics(
|
||||
CONNECTIONS_MADE,
|
||||
totalConnectionMadeBeforeTest + createRequestCount,
|
||||
fs.getInstrumentationMap());
|
||||
|
||||
final Path overwriteFilePath = new Path("/OverwriteTest_FileName_"
|
||||
+ UUID.randomUUID().toString());
|
||||
|
||||
// Case 3: Overwrite - File does not pre-exist
|
||||
// create should be successful
|
||||
fs.create(overwriteFilePath, true);
|
||||
|
||||
// One request to server to create path should be issued
|
||||
createRequestCount++;
|
||||
|
||||
assertAbfsStatistics(
|
||||
CONNECTIONS_MADE,
|
||||
totalConnectionMadeBeforeTest + createRequestCount,
|
||||
fs.getInstrumentationMap());
|
||||
|
||||
// Case 4: Overwrite - File pre-exists
|
||||
fs.create(overwriteFilePath, true);
|
||||
|
||||
if (enableConditionalCreateOverwrite) {
|
||||
// Three requests will be sent to server to create path,
|
||||
// 1. create without overwrite
|
||||
// 2. GetFileStatus to get eTag
|
||||
// 3. create with overwrite
|
||||
createRequestCount += 3;
|
||||
} else {
|
||||
createRequestCount++;
|
||||
}
|
||||
|
||||
assertAbfsStatistics(
|
||||
CONNECTIONS_MADE,
|
||||
totalConnectionMadeBeforeTest + createRequestCount,
|
||||
fs.getInstrumentationMap());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test negative scenarios with Create overwrite=false as default
|
||||
* With create overwrite=true ending in 3 calls:
|
||||
* A. Create overwrite=false
|
||||
* B. GFS
|
||||
* C. Create overwrite=true
|
||||
*
|
||||
* Scn1: A fails with HTTP409, leading to B which fails with HTTP404,
|
||||
* detect parallel access
|
||||
* Scn2: A fails with HTTP409, leading to B which fails with HTTP500,
|
||||
* fail create with HTTP500
|
||||
* Scn3: A fails with HTTP409, leading to B and then C,
|
||||
* which fails with HTTP412, detect parallel access
|
||||
* Scn4: A fails with HTTP409, leading to B and then C,
|
||||
* which fails with HTTP500, fail create with HTTP500
|
||||
* Scn5: A fails with HTTP500, fail create with HTTP500
|
||||
*/
|
||||
@Test
|
||||
public void testNegativeScenariosForCreateOverwriteDisabled()
|
||||
throws Throwable {
|
||||
|
||||
final AzureBlobFileSystem currentFs = getFileSystem();
|
||||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
config.set("fs.azure.enable.conditional.create.overwrite",
|
||||
Boolean.toString(true));
|
||||
|
||||
final AzureBlobFileSystem fs =
|
||||
(AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
|
||||
config);
|
||||
|
||||
// Get mock AbfsClient with current config
|
||||
AbfsClient
|
||||
mockClient
|
||||
= TestAbfsClient.getMockAbfsClient(
|
||||
fs.getAbfsStore().getClient(),
|
||||
fs.getAbfsStore().getAbfsConfiguration());
|
||||
|
||||
AzureBlobFileSystemStore abfsStore = fs.getAbfsStore();
|
||||
abfsStore = setAzureBlobSystemStoreField(abfsStore, "client", mockClient);
|
||||
|
||||
AbfsRestOperation successOp = mock(
|
||||
AbfsRestOperation.class);
|
||||
AbfsHttpOperation http200Op = mock(
|
||||
AbfsHttpOperation.class);
|
||||
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
|
||||
when(successOp.getResult()).thenReturn(http200Op);
|
||||
|
||||
AbfsRestOperationException conflictResponseEx
|
||||
= getMockAbfsRestOperationException(HTTP_CONFLICT);
|
||||
AbfsRestOperationException serverErrorResponseEx
|
||||
= getMockAbfsRestOperationException(HTTP_INTERNAL_ERROR);
|
||||
AbfsRestOperationException fileNotFoundResponseEx
|
||||
= getMockAbfsRestOperationException(HTTP_NOT_FOUND);
|
||||
AbfsRestOperationException preConditionResponseEx
|
||||
= getMockAbfsRestOperationException(HTTP_PRECON_FAILED);
|
||||
|
||||
doThrow(conflictResponseEx) // Scn1: GFS fails with Http404
|
||||
.doThrow(conflictResponseEx) // Scn2: GFS fails with Http500
|
||||
.doThrow(
|
||||
conflictResponseEx) // Scn3: create overwrite=true fails with Http412
|
||||
.doThrow(
|
||||
conflictResponseEx) // Scn4: create overwrite=true fails with Http500
|
||||
.doThrow(
|
||||
serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500
|
||||
.when(mockClient)
|
||||
.createPath(any(String.class), eq(true), eq(false), any(String.class),
|
||||
any(String.class), any(boolean.class), eq(null));
|
||||
|
||||
doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404
|
||||
.doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500
|
||||
.doReturn(successOp) // Scn3: create overwrite=true fails with Http412
|
||||
.doReturn(successOp) // Scn4: create overwrite=true fails with Http500
|
||||
.when(mockClient)
|
||||
.getPathStatus(any(String.class), eq(false));
|
||||
|
||||
doThrow(
|
||||
preConditionResponseEx) // Scn3: create overwrite=true fails with Http412
|
||||
.doThrow(
|
||||
serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500
|
||||
.when(mockClient)
|
||||
.createPath(any(String.class), eq(true), eq(true), any(String.class),
|
||||
any(String.class), any(boolean.class), eq(null));
|
||||
|
||||
// Scn1: GFS fails with Http404
|
||||
// Sequence of events expected:
|
||||
// 1. create overwrite=false - fail with conflict
|
||||
// 2. GFS - fail with File Not found
|
||||
// Create will fail with ConcurrentWriteOperationDetectedException
|
||||
validateCreateFileException(ConcurrentWriteOperationDetectedException.class,
|
||||
abfsStore);
|
||||
|
||||
// Scn2: GFS fails with Http500
|
||||
// Sequence of events expected:
|
||||
// 1. create overwrite=false - fail with conflict
|
||||
// 2. GFS - fail with Server error
|
||||
// Create will fail with 500
|
||||
validateCreateFileException(AbfsRestOperationException.class, abfsStore);
|
||||
|
||||
// Scn3: create overwrite=true fails with Http412
|
||||
// Sequence of events expected:
|
||||
// 1. create overwrite=false - fail with conflict
|
||||
// 2. GFS - pass
|
||||
// 3. create overwrite=true - fail with Pre-Condition
|
||||
// Create will fail with ConcurrentWriteOperationDetectedException
|
||||
validateCreateFileException(ConcurrentWriteOperationDetectedException.class,
|
||||
abfsStore);
|
||||
|
||||
// Scn4: create overwrite=true fails with Http500
|
||||
// Sequence of events expected:
|
||||
// 1. create overwrite=false - fail with conflict
|
||||
// 2. GFS - pass
|
||||
// 3. create overwrite=true - fail with Server error
|
||||
// Create will fail with 500
|
||||
validateCreateFileException(AbfsRestOperationException.class, abfsStore);
|
||||
|
||||
// Scn5: create overwrite=false fails with Http500
|
||||
// Sequence of events expected:
|
||||
// 1. create overwrite=false - fail with server error
|
||||
// Create will fail with 500
|
||||
validateCreateFileException(AbfsRestOperationException.class, abfsStore);
|
||||
}
|
||||
|
||||
private AzureBlobFileSystemStore setAzureBlobSystemStoreField(
|
||||
final AzureBlobFileSystemStore abfsStore,
|
||||
final String fieldName,
|
||||
Object fieldObject) throws Exception {
|
||||
|
||||
Field abfsClientField = AzureBlobFileSystemStore.class.getDeclaredField(
|
||||
fieldName);
|
||||
abfsClientField.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(abfsClientField,
|
||||
abfsClientField.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
|
||||
abfsClientField.set(abfsStore, fieldObject);
|
||||
return abfsStore;
|
||||
}
|
||||
|
||||
private <E extends Throwable> void validateCreateFileException(final Class<E> exceptionClass, final AzureBlobFileSystemStore abfsStore)
|
||||
throws Exception {
|
||||
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
|
||||
FsAction.ALL);
|
||||
FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE,
|
||||
FsAction.NONE);
|
||||
Path testPath = new Path("testFile");
|
||||
intercept(
|
||||
exceptionClass,
|
||||
() -> abfsStore.createFile(testPath, null, true, permission, umask));
|
||||
}
|
||||
|
||||
private AbfsRestOperationException getMockAbfsRestOperationException(int status) {
|
||||
return new AbfsRestOperationException(status, "", "", new Exception());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,12 +18,18 @@
|
|||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
||||
|
||||
/**
|
||||
* Test mkdir operation.
|
||||
*/
|
||||
|
@ -45,4 +51,58 @@ public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
|
|||
public void testCreateRoot() throws Exception {
|
||||
assertMkdirs(getFileSystem(), new Path("/"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test mkdir for possible values of fs.azure.disable.default.create.overwrite
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testDefaultCreateOverwriteDirTest() throws Throwable {
|
||||
// the config fs.azure.disable.default.create.overwrite should have no
|
||||
// effect on mkdirs
|
||||
testCreateDirOverwrite(true);
|
||||
testCreateDirOverwrite(false);
|
||||
}
|
||||
|
||||
public void testCreateDirOverwrite(boolean enableConditionalCreateOverwrite)
|
||||
throws Throwable {
|
||||
final AzureBlobFileSystem currentFs = getFileSystem();
|
||||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
config.set("fs.azure.enable.conditional.create.overwrite",
|
||||
Boolean.toString(enableConditionalCreateOverwrite));
|
||||
|
||||
final AzureBlobFileSystem fs =
|
||||
(AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
|
||||
config);
|
||||
|
||||
long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
|
||||
.get(CONNECTIONS_MADE.getStatName());
|
||||
|
||||
int mkdirRequestCount = 0;
|
||||
final Path dirPath = new Path("/DirPath_"
|
||||
+ UUID.randomUUID().toString());
|
||||
|
||||
// Case 1: Dir does not pre-exist
|
||||
fs.mkdirs(dirPath);
|
||||
|
||||
// One request to server
|
||||
mkdirRequestCount++;
|
||||
|
||||
assertAbfsStatistics(
|
||||
CONNECTIONS_MADE,
|
||||
totalConnectionMadeBeforeTest + mkdirRequestCount,
|
||||
fs.getInstrumentationMap());
|
||||
|
||||
// Case 2: Dir pre-exists
|
||||
// Mkdir on existing Dir path will not lead to failure
|
||||
fs.mkdirs(dirPath);
|
||||
|
||||
// One request to server
|
||||
mkdirRequestCount++;
|
||||
|
||||
assertAbfsStatistics(
|
||||
CONNECTIONS_MADE,
|
||||
totalConnectionMadeBeforeTest + mkdirRequestCount,
|
||||
fs.getInstrumentationMap());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -283,8 +283,7 @@ public final class TestAbfsClient {
|
|||
}
|
||||
|
||||
public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
|
||||
AbfsConfiguration abfsConfig)
|
||||
throws IOException, NoSuchFieldException, IllegalAccessException {
|
||||
AbfsConfiguration abfsConfig) throws Exception {
|
||||
AuthType currentAuthType = abfsConfig.getAuthType(
|
||||
abfsConfig.getAccountName());
|
||||
|
||||
|
@ -310,47 +309,46 @@ public final class TestAbfsClient {
|
|||
when(client.createDefaultHeaders()).thenCallRealMethod();
|
||||
|
||||
// override baseurl
|
||||
Field baseUrlField = AbfsClient.class.getDeclaredField("baseUrl");
|
||||
baseUrlField.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(baseUrlField, baseUrlField.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
|
||||
baseUrlField.set(client, baseAbfsClientInstance.getBaseUrl());
|
||||
client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
|
||||
abfsConfig);
|
||||
|
||||
// override baseurl
|
||||
client = TestAbfsClient.setAbfsClientField(client, "baseUrl",
|
||||
baseAbfsClientInstance.getBaseUrl());
|
||||
|
||||
// override auth provider
|
||||
if (currentAuthType == AuthType.SharedKey) {
|
||||
Field sharedKeyCredsField = AbfsClient.class.getDeclaredField(
|
||||
"sharedKeyCredentials");
|
||||
sharedKeyCredsField.setAccessible(true);
|
||||
modifiersField.setInt(sharedKeyCredsField,
|
||||
sharedKeyCredsField.getModifiers()
|
||||
& ~java.lang.reflect.Modifier.FINAL);
|
||||
sharedKeyCredsField.set(client, new SharedKeyCredentials(
|
||||
abfsConfig.getAccountName().substring(0,
|
||||
abfsConfig.getAccountName().indexOf(DOT)),
|
||||
abfsConfig.getStorageAccountKey()));
|
||||
client = TestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials",
|
||||
new SharedKeyCredentials(
|
||||
abfsConfig.getAccountName().substring(0,
|
||||
abfsConfig.getAccountName().indexOf(DOT)),
|
||||
abfsConfig.getStorageAccountKey()));
|
||||
} else {
|
||||
Field tokenProviderField = AbfsClient.class.getDeclaredField(
|
||||
"tokenProvider");
|
||||
tokenProviderField.setAccessible(true);
|
||||
modifiersField.setInt(tokenProviderField,
|
||||
tokenProviderField.getModifiers()
|
||||
& ~java.lang.reflect.Modifier.FINAL);
|
||||
tokenProviderField.set(client, abfsConfig.getTokenProvider());
|
||||
client = TestAbfsClient.setAbfsClientField(client, "tokenProvider",
|
||||
abfsConfig.getTokenProvider());
|
||||
}
|
||||
|
||||
// override user agent
|
||||
String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild "
|
||||
+ "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
|
||||
+ "UNKNOWN/UNKNOWN) MSFT";
|
||||
Field userAgentField = AbfsClient.class.getDeclaredField(
|
||||
"userAgent");
|
||||
userAgentField.setAccessible(true);
|
||||
modifiersField.setInt(userAgentField,
|
||||
userAgentField.getModifiers()
|
||||
& ~java.lang.reflect.Modifier.FINAL);
|
||||
userAgentField.set(client, userAgent);
|
||||
client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent);
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
private static AbfsClient setAbfsClientField(
|
||||
final AbfsClient client,
|
||||
final String fieldName,
|
||||
Object fieldObject) throws Exception {
|
||||
|
||||
Field field = AbfsClient.class.getDeclaredField(fieldName);
|
||||
field.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(field,
|
||||
field.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
|
||||
field.set(client, fieldObject);
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue