HADOOP-15086. NativeAzureFileSystem file rename is not atomic.

Contributed by Thomas Marquardt
This commit is contained in:
Steve Loughran 2017-12-22 11:39:55 +00:00
parent 76e664e931
commit 52babbb4a0
9 changed files with 145 additions and 17 deletions

View File

@ -2605,12 +2605,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
@Override
public void rename(String srcKey, String dstKey) throws IOException {
rename(srcKey, dstKey, false, null);
rename(srcKey, dstKey, false, null, true);
}
@Override
public void rename(String srcKey, String dstKey, boolean acquireLease,
SelfRenewingLease existingLease) throws IOException {
rename(srcKey, dstKey, acquireLease, existingLease, true);
}
@Override
public void rename(String srcKey, String dstKey, boolean acquireLease,
SelfRenewingLease existingLease, boolean overwriteDestination) throws IOException {
LOG.debug("Moving {} to {}", srcKey, dstKey);
@ -2672,7 +2678,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// a more intensive exponential retry policy when the cluster is getting
// throttled.
try {
dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
dstBlob.startCopyFromBlob(srcBlob, null,
getInstrumentedContext(), overwriteDestination);
} catch (StorageException se) {
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
int copyBlobMinBackoff = sessionConfiguration.getInt(
@ -2695,7 +2702,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
options.setRetryPolicyFactory(new RetryExponentialRetry(
copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
copyBlobMaxRetries));
dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext());
dstBlob.startCopyFromBlob(srcBlob, options,
getInstrumentedContext(), overwriteDestination);
} else {
throw se;
}

View File

@ -3269,17 +3269,28 @@ public class NativeAzureFileSystem extends FileSystem {
} else if (!srcMetadata.isDir()) {
LOG.debug("Source {} found as a file, renaming.", src);
try {
store.rename(srcKey, dstKey);
// HADOOP-15086 - file rename must ensure that the destination does
// not exist. The fix is targeted to this call only to avoid
// regressions. Other call sites are attempting to rename temporary
// files, redo a failed rename operation, or rename a directory
// recursively; for these cases the destination may exist.
store.rename(srcKey, dstKey, false, null,
false);
} catch(IOException ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
if (innerException instanceof StorageException) {
if (NativeAzureFileSystemHelper.isFileNotFoundException(
(StorageException) innerException)) {
LOG.debug("BlobNotFoundException encountered. Failing rename", src);
return false;
}
if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict(
(StorageException) innerException)) {
LOG.debug("Destination BlobAlreadyExists. Failing rename", src);
return false;
}
}
throw ex;
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
import java.io.EOFException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Map;
import com.google.common.base.Preconditions;
@ -95,6 +96,23 @@ final class NativeAzureFileSystemHelper {
return false;
}
/*
* Determines if a conditional request failed because the blob already
* exists.
*
* @param e - the storage exception thrown by the failed operation.
*
* @return true if a conditional request failed because the blob already
* exists; otherwise, returns false.
*/
static boolean isBlobAlreadyExistsConflict(StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT
&& StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) {
return true;
}
return false;
}
/*
* Helper method that logs stack traces from all live threads.
*/

View File

@ -91,6 +91,10 @@ interface NativeFileSystemStore {
void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease)
throws IOException;
void rename(String srcKey, String dstKey, boolean acquireLease,
SelfRenewingLease existingLease, boolean overwriteDestination)
throws IOException;
/**
* Delete all keys with the given prefix. Used for testing.
*

View File

@ -503,10 +503,14 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext)
OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException {
AccessCondition dstAccessCondition =
overwriteDestination
? null
: AccessCondition.generateIfNotExistsCondition();
getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
null, null, options, opContext);
null, dstAccessCondition, options, opContext);
}
@Override

View File

@ -406,7 +406,7 @@ abstract class StorageInterface {
*
*/
public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob,
BlobRequestOptions options, OperationContext opContext)
BlobRequestOptions options, OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException;
/**

View File

@ -425,10 +425,14 @@ class StorageInterfaceImpl extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext)
OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException {
AccessCondition dstAccessCondition =
overwriteDestination
? null
: AccessCondition.generateIfNotExistsCondition();
getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
null, null, options, opContext);
null, dstAccessCondition, options, opContext);
}
@Override

View File

@ -18,8 +18,16 @@
package org.apache.hadoop.fs.azure;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@ -40,6 +48,70 @@ public class ITestNativeAzureFileSystemLive extends
return AzureBlobStorageTestAccount.create();
}
/**
* Tests the rename file operation to ensure that when there are multiple
* attempts to rename a file to the same destination, only one rename
* operation is successful (HADOOP-15086).
*/
@Test
public void testMultipleRenameFileOperationsToSameDestination()
throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger successfulRenameCount = new AtomicInteger(0);
final AtomicReference<IOException> unexpectedError = new AtomicReference<IOException>();
final Path dest = path("dest");
// Run 10 threads to rename multiple files to the same target path
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int threadNumber = i;
Path src = path("test" + threadNumber);
threads.add(new Thread(() -> {
try {
latch.await(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
try {
try (OutputStream output = fs.create(src)) {
output.write(("Source file number " + threadNumber).getBytes());
}
if (fs.rename(src, dest)) {
LOG.info("rename succeeded for thread " + threadNumber);
successfulRenameCount.incrementAndGet();
}
} catch (IOException e) {
unexpectedError.compareAndSet(null, e);
ContractTestUtils.fail("Exception unexpected", e);
}
}));
}
// Start each thread
threads.forEach(t -> t.start());
// Wait for threads to start and wait on latch
Thread.sleep(2000);
// Now start to rename
latch.countDown();
// Wait for all threads to complete
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
}
});
if (unexpectedError.get() != null) {
throw unexpectedError.get();
}
assertEquals(1, successfulRenameCount.get());
LOG.info("Success, only one rename operation succeeded!");
}
@Test
public void testLazyRenamePendingCanOverwriteExistingFile()
throws Exception {

View File

@ -425,7 +425,14 @@ public class MockStorageInterface extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext) throws StorageException, URISyntaxException {
OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException {
if (!overwriteDestination && backingStore.exists(convertUriToDecodedString(uri))) {
throw new StorageException("BlobAlreadyExists",
"The blob already exists.",
HttpURLConnection.HTTP_CONFLICT,
null,
null);
}
backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri));
//TODO: set the backingStore.properties.CopyState and
// update azureNativeFileSystemStore.waitForCopyToComplete