HADOOP-15086. NativeAzureFileSystem file rename is not atomic.

Contributed by Thomas Marquardt
(Backported to branch-2 via /HADOOP-15156)
This commit is contained in:
Steve Loughran 2018-01-08 15:12:41 +00:00
parent 8e0a5b1514
commit a35267b47a
9 changed files with 182 additions and 17 deletions

View File

@ -2605,12 +2605,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
@Override
public void rename(String srcKey, String dstKey) throws IOException {
rename(srcKey, dstKey, false, null);
rename(srcKey, dstKey, false, null, true);
}
@Override
public void rename(String srcKey, String dstKey, boolean acquireLease,
SelfRenewingLease existingLease) throws IOException {
SelfRenewingLease existingLease) throws IOException {
rename(srcKey, dstKey, acquireLease, existingLease, true);
}
@Override
public void rename(String srcKey, String dstKey, boolean acquireLease,
SelfRenewingLease existingLease, boolean overwriteDestination) throws IOException {
LOG.debug("Moving {} to {}", srcKey, dstKey);
@ -2672,7 +2678,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// a more intensive exponential retry policy when the cluster is getting
// throttled.
try {
dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
dstBlob.startCopyFromBlob(srcBlob, null,
getInstrumentedContext(), overwriteDestination);
} catch (StorageException se) {
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
int copyBlobMinBackoff = sessionConfiguration.getInt(
@ -2695,7 +2702,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
options.setRetryPolicyFactory(new RetryExponentialRetry(
copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
copyBlobMaxRetries));
dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext());
dstBlob.startCopyFromBlob(srcBlob, options,
getInstrumentedContext(), overwriteDestination);
} else {
throw se;
}

View File

@ -3287,16 +3287,27 @@ public class NativeAzureFileSystem extends FileSystem {
} else if (!srcMetadata.isDir()) {
LOG.debug("Source {} found as a file, renaming.", src);
try {
store.rename(srcKey, dstKey);
// HADOOP-15086 - file rename must ensure that the destination does
// not exist. The fix is targeted to this call only to avoid
// regressions. Other call sites are attempting to rename temporary
// files, redo a failed rename operation, or rename a directory
// recursively; for these cases the destination may exist.
store.rename(srcKey, dstKey, false, null,
false);
} catch(IOException ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFoundException encountered. Failing rename", src);
return false;
if (innerException instanceof StorageException) {
if (NativeAzureFileSystemHelper.isFileNotFoundException(
(StorageException) innerException)) {
LOG.debug("BlobNotFoundException encountered. Failing rename", src);
return false;
}
if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict(
(StorageException) innerException)) {
LOG.debug("Destination BlobAlreadyExists. Failing rename", src);
return false;
}
}
throw ex;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
import java.io.EOFException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Map;
import com.google.common.base.Preconditions;
@ -95,6 +96,23 @@ final class NativeAzureFileSystemHelper {
return false;
}
/*
* Determines if a conditional request failed because the blob already
* exists.
*
* @param e - the storage exception thrown by the failed operation.
*
* @return true if a conditional request failed because the blob already
* exists; otherwise, returns false.
*/
static boolean isBlobAlreadyExistsConflict(StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT
&& StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) {
return true;
}
return false;
}
/*
* Helper method that logs stack traces from all live threads.
*/

View File

@ -91,6 +91,10 @@ interface NativeFileSystemStore {
void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease)
throws IOException;
void rename(String srcKey, String dstKey, boolean acquireLease,
SelfRenewingLease existingLease, boolean overwriteDestination)
throws IOException;
/**
* Delete all keys with the given prefix. Used for testing.
*

View File

@ -503,10 +503,14 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext)
OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException {
AccessCondition dstAccessCondition =
overwriteDestination
? null
: AccessCondition.generateIfNotExistsCondition();
getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
null, null, options, opContext);
null, dstAccessCondition, options, opContext);
}
@Override

View File

@ -406,7 +406,7 @@ abstract class StorageInterface {
*
*/
public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob,
BlobRequestOptions options, OperationContext opContext)
BlobRequestOptions options, OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException;
/**

View File

@ -425,10 +425,14 @@ class StorageInterfaceImpl extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext)
OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException {
AccessCondition dstAccessCondition =
overwriteDestination
? null
: AccessCondition.generateIfNotExistsCondition();
getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
null, null, options, opContext);
null, dstAccessCondition, options, opContext);
}
@Override

View File

@ -18,8 +18,17 @@
package org.apache.hadoop.fs.azure;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@ -40,6 +49,106 @@ public class ITestNativeAzureFileSystemLive extends
return AzureBlobStorageTestAccount.create();
}
/**
* Implements the thread start routine for the test
* testMultipleRenameFileOperationsToSameDestination.
*/
private static class RenameThread implements Runnable {
private final FileSystem fs;
private final CountDownLatch latch;
private final int threadNumber;
private final Path src;
private final Path dst;
private final AtomicInteger successfulRenameCount;
private final AtomicReference<IOException> unexpectedError;
RenameThread(FileSystem fs,
CountDownLatch latch,
int threadNumber,
Path src,
Path dst,
AtomicInteger successfulRenameCount,
AtomicReference<IOException> unexpectedError) {
this.fs = fs;
this.latch = latch;
this.threadNumber = threadNumber;
this.src = src;
this.dst = dst;
this.successfulRenameCount = successfulRenameCount;
this.unexpectedError = unexpectedError;
}
@Override
public void run() {
try {
latch.await(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
try {
try (OutputStream output = fs.create(src)) {
output.write(("Source file number " + threadNumber).getBytes());
}
if (fs.rename(src, dst)) {
LOG.info("rename succeeded for thread " + threadNumber);
successfulRenameCount.incrementAndGet();
}
} catch (IOException e) {
unexpectedError.compareAndSet(null, e);
ContractTestUtils.fail("Exception unexpected", e);
}
}
}
/**
* Tests the rename file operation to ensure that when there are multiple
* attempts to rename a file to the same destination, only one rename
* operation is successful (HADOOP-15086).
*/
@Test
public void testMultipleRenameFileOperationsToSameDestination()
throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger successfulRenameCount = new AtomicInteger(0);
final AtomicReference<IOException> unexpectedError = new AtomicReference<IOException>();
final Path dest = path("dest");
// Run 10 threads to rename multiple files to the same target path
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int threadNumber = i;
Path src = path("test" + threadNumber);
threads.add(new Thread(new RenameThread(fs, latch, threadNumber, src, dest, successfulRenameCount, unexpectedError)));
}
// Start each thread
for (int i = 0; i < threads.size(); i++) {
threads.get(i).start();
}
// Wait for threads to start and wait on latch
Thread.sleep(2000);
// Now start to rename
latch.countDown();
// Wait for all threads to complete
for (int i = 0; i < threads.size(); i++) {
try {
threads.get(i).join();
} catch (InterruptedException e) {
}
}
if (unexpectedError.get() != null) {
throw unexpectedError.get();
}
assertEquals(1, successfulRenameCount.get());
LOG.info("Success, only one rename operation succeeded!");
}
@Test
public void testLazyRenamePendingCanOverwriteExistingFile()
throws Exception {

View File

@ -425,7 +425,14 @@ public class MockStorageInterface extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
OperationContext opContext) throws StorageException, URISyntaxException {
OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException {
if (!overwriteDestination && backingStore.exists(convertUriToDecodedString(uri))) {
throw new StorageException("BlobAlreadyExists",
"The blob already exists.",
HttpURLConnection.HTTP_CONFLICT,
null,
null);
}
backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri));
//TODO: set the backingStore.properties.CopyState and
// update azureNativeFileSystemStore.waitForCopyToComplete