diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index 730373cb45b..7111dcf802d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -2605,12 +2605,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { @Override public void rename(String srcKey, String dstKey) throws IOException { - rename(srcKey, dstKey, false, null); + rename(srcKey, dstKey, false, null, true); } @Override public void rename(String srcKey, String dstKey, boolean acquireLease, - SelfRenewingLease existingLease) throws IOException { + SelfRenewingLease existingLease) throws IOException { + rename(srcKey, dstKey, acquireLease, existingLease, true); + } + + @Override + public void rename(String srcKey, String dstKey, boolean acquireLease, + SelfRenewingLease existingLease, boolean overwriteDestination) throws IOException { LOG.debug("Moving {} to {}", srcKey, dstKey); @@ -2672,7 +2678,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // a more intensive exponential retry policy when the cluster is getting // throttled. try { - dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext()); + dstBlob.startCopyFromBlob(srcBlob, null, + getInstrumentedContext(), overwriteDestination); } catch (StorageException se) { if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) { int copyBlobMinBackoff = sessionConfiguration.getInt( @@ -2695,7 +2702,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { options.setRetryPolicyFactory(new RetryExponentialRetry( copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff, copyBlobMaxRetries)); - dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext()); + dstBlob.startCopyFromBlob(srcBlob, options, + getInstrumentedContext(), overwriteDestination); } else { throw se; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index d89a523ffd2..af428493bd9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -3287,16 +3287,27 @@ public class NativeAzureFileSystem extends FileSystem { } else if (!srcMetadata.isDir()) { LOG.debug("Source {} found as a file, renaming.", src); try { - store.rename(srcKey, dstKey); + // HADOOP-15086 - file rename must ensure that the destination does + // not exist. The fix is targeted to this call only to avoid + // regressions. Other call sites are attempting to rename temporary + // files, redo a failed rename operation, or rename a directory + // recursively; for these cases the destination may exist. + store.rename(srcKey, dstKey, false, null, + false); } catch(IOException ex) { - Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); - if (innerException instanceof StorageException - && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { - - LOG.debug("BlobNotFoundException encountered. Failing rename", src); - return false; + if (innerException instanceof StorageException) { + if (NativeAzureFileSystemHelper.isFileNotFoundException( + (StorageException) innerException)) { + LOG.debug("BlobNotFoundException encountered. Failing rename", src); + return false; + } + if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict( + (StorageException) innerException)) { + LOG.debug("Destination BlobAlreadyExists. Failing rename", src); + return false; + } } throw ex; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java index 57af1f834d4..754f3431426 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure; import java.io.EOFException; import java.io.IOException; +import java.net.HttpURLConnection; import java.util.Map; import com.google.common.base.Preconditions; @@ -95,6 +96,23 @@ final class NativeAzureFileSystemHelper { return false; } + /* + * Determines if a conditional request failed because the blob already + * exists. + * + * @param e - the storage exception thrown by the failed operation. + * + * @return true if a conditional request failed because the blob already + * exists; otherwise, returns false. + */ + static boolean isBlobAlreadyExistsConflict(StorageException e) { + if (e.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT + && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) { + return true; + } + return false; + } + /* * Helper method that logs stack traces from all live threads. */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java index 57a729dc155..b67ab1b297b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -91,6 +91,10 @@ interface NativeFileSystemStore { void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease) throws IOException; + void rename(String srcKey, String dstKey, boolean acquireLease, + SelfRenewingLease existingLease, boolean overwriteDestination) + throws IOException; + /** * Delete all keys with the given prefix. Used for testing. * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java index 7c2722ed13b..0f54249e833 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java @@ -503,10 +503,14 @@ public class SecureStorageInterfaceImpl extends StorageInterface { @Override public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options, - OperationContext opContext) + OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException { + AccessCondition dstAccessCondition = + overwriteDestination + ? null + : AccessCondition.generateIfNotExistsCondition(); getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(), - null, null, options, opContext); + null, dstAccessCondition, options, opContext); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java index e03d7311e23..dbb38491d7f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java @@ -406,7 +406,7 @@ abstract class StorageInterface { * */ public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob, - BlobRequestOptions options, OperationContext opContext) + BlobRequestOptions options, OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException; /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java index 41a4dbb159d..e600f9e59da 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java @@ -425,10 +425,14 @@ class StorageInterfaceImpl extends StorageInterface { @Override public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options, - OperationContext opContext) + OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException { + AccessCondition dstAccessCondition = + overwriteDestination + ? null + : AccessCondition.generateIfNotExistsCondition(); getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(), - null, null, options, opContext); + null, dstAccessCondition, options, opContext); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java index f969968110e..702ad66a7bc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java @@ -18,8 +18,17 @@ package org.apache.hadoop.fs.azure; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -40,6 +49,106 @@ public class ITestNativeAzureFileSystemLive extends return AzureBlobStorageTestAccount.create(); } + /** + * Implements the thread start routine for the test + * testMultipleRenameFileOperationsToSameDestination. + */ + private static class RenameThread implements Runnable { + + private final FileSystem fs; + private final CountDownLatch latch; + private final int threadNumber; + private final Path src; + private final Path dst; + private final AtomicInteger successfulRenameCount; + private final AtomicReference unexpectedError; + + RenameThread(FileSystem fs, + CountDownLatch latch, + int threadNumber, + Path src, + Path dst, + AtomicInteger successfulRenameCount, + AtomicReference unexpectedError) { + this.fs = fs; + this.latch = latch; + this.threadNumber = threadNumber; + this.src = src; + this.dst = dst; + this.successfulRenameCount = successfulRenameCount; + this.unexpectedError = unexpectedError; + } + + @Override + public void run() { + try { + latch.await(Long.MAX_VALUE, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + try { + try (OutputStream output = fs.create(src)) { + output.write(("Source file number " + threadNumber).getBytes()); + } + + if (fs.rename(src, dst)) { + LOG.info("rename succeeded for thread " + threadNumber); + successfulRenameCount.incrementAndGet(); + } + } catch (IOException e) { + unexpectedError.compareAndSet(null, e); + ContractTestUtils.fail("Exception unexpected", e); + } + } + } + + /** + * Tests the rename file operation to ensure that when there are multiple + * attempts to rename a file to the same destination, only one rename + * operation is successful (HADOOP-15086). + */ + @Test + public void testMultipleRenameFileOperationsToSameDestination() + throws IOException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger successfulRenameCount = new AtomicInteger(0); + final AtomicReference unexpectedError = new AtomicReference(); + final Path dest = path("dest"); + + // Run 10 threads to rename multiple files to the same target path + List threads = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + final int threadNumber = i; + Path src = path("test" + threadNumber); + threads.add(new Thread(new RenameThread(fs, latch, threadNumber, src, dest, successfulRenameCount, unexpectedError))); + } + + // Start each thread + for (int i = 0; i < threads.size(); i++) { + threads.get(i).start(); + } + + // Wait for threads to start and wait on latch + Thread.sleep(2000); + + // Now start to rename + latch.countDown(); + + // Wait for all threads to complete + for (int i = 0; i < threads.size(); i++) { + try { + threads.get(i).join(); + } catch (InterruptedException e) { + } + } + + if (unexpectedError.get() != null) { + throw unexpectedError.get(); + } + assertEquals(1, successfulRenameCount.get()); + LOG.info("Success, only one rename operation succeeded!"); + } + @Test public void testLazyRenamePendingCanOverwriteExistingFile() throws Exception { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java index e0ae7b4f195..d5f6437d960 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java @@ -425,7 +425,14 @@ public class MockStorageInterface extends StorageInterface { @Override public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options, - OperationContext opContext) throws StorageException, URISyntaxException { + OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException { + if (!overwriteDestination && backingStore.exists(convertUriToDecodedString(uri))) { + throw new StorageException("BlobAlreadyExists", + "The blob already exists.", + HttpURLConnection.HTTP_CONFLICT, + null, + null); + } backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri)); //TODO: set the backingStore.properties.CopyState and // update azureNativeFileSystemStore.waitForCopyToComplete