diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 48aa7820f5e..af68844dd27 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -702,6 +702,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12533. Introduce FileNotFoundException in WASB for read and seek API. (Dushyanth via cnauroth) + HADOOP-12508. delete fails with exception when lease is held on blob. + (Gaurav Kanade via cnauroth) + OPTIMIZATIONS HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString() diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index 6412714ff99..69ece4a4278 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -2370,7 +2370,37 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { @Override public void delete(String key) throws IOException { - delete(key, null); + try { + delete(key, null); + } catch (IOException e) { + Throwable t = e.getCause(); + if(t != null && t instanceof StorageException) { + StorageException se = (StorageException) t; + if(se.getErrorCode().equals(("LeaseIdMissing"))){ + SelfRenewingLease lease = null; + try { + lease = acquireLease(key); + delete(key, lease); + } catch (AzureException e3) { + LOG.warn("Got unexpected exception trying to acquire lease on " + + key + "." + e3.getMessage()); + throw e3; + } finally { + try { + if(lease != null){ + lease.free(); + } + } catch (Exception e4){ + LOG.error("Unable to free lease on " + key, e4); + } + } + } else { + throw e; + } + } else { + throw e; + } + } } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java index 06f32ce3618..900d7301bf9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java @@ -22,6 +22,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper; +import com.google.common.annotations.VisibleForTesting; + import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlob; @@ -61,7 +63,8 @@ public class SelfRenewingLease { // Time to wait to retry getting the lease in milliseconds - private static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000; + @VisibleForTesting + static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000; public SelfRenewingLease(CloudBlobWrapper blobWrapper) throws StorageException { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemLive.java index b033460d3bd..721cb5ff802 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemLive.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemLive.java @@ -21,10 +21,16 @@ package org.apache.hadoop.fs.azure; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; + import org.junit.Test; +import com.microsoft.azure.storage.StorageException; + /* * Tests the Native Azure file system (WASB) against an actual blob store if * provided in the environment. @@ -37,6 +43,86 @@ public class TestNativeAzureFileSystemLive extends return AzureBlobStorageTestAccount.create(); } + /** + * Tests fs.delete() function to delete a blob when another blob is holding a + * lease on it. Delete if called without a lease should fail if another process + * is holding a lease and throw appropriate exception + * This is a scenario that would happen in HMaster startup when it tries to + * clean up the temp dirs while the HMaster process which was killed earlier + * held lease on the blob when doing some DDL operation + */ + @Test + public void testDeleteThrowsExceptionWithLeaseExistsErrorMessage() + throws Exception { + LOG.info("Starting test"); + final String FILE_KEY = "fileWithLease"; + // Create the file + Path path = new Path(FILE_KEY); + fs.create(path); + assertTrue(fs.exists(path)); + NativeAzureFileSystem nfs = (NativeAzureFileSystem)fs; + final String fullKey = nfs.pathToKey(nfs.makeAbsolute(path)); + final AzureNativeFileSystemStore store = nfs.getStore(); + + // Acquire the lease on the file in a background thread + final CountDownLatch leaseAttemptComplete = new CountDownLatch(1); + final CountDownLatch beginningDeleteAttempt = new CountDownLatch(1); + Thread t = new Thread() { + @Override + public void run() { + // Acquire the lease and then signal the main test thread. + SelfRenewingLease lease = null; + try { + lease = store.acquireLease(fullKey); + LOG.info("Lease acquired: " + lease.getLeaseID()); + } catch (AzureException e) { + LOG.warn("Lease acqusition thread unable to acquire lease", e); + } finally { + leaseAttemptComplete.countDown(); + } + + // Wait for the main test thread to signal it will attempt the delete. + try { + beginningDeleteAttempt.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Keep holding the lease past the lease acquisition retry interval, so + // the test covers the case of delete retrying to acquire the lease. + try { + Thread.sleep(SelfRenewingLease.LEASE_ACQUIRE_RETRY_INTERVAL * 3); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + try { + if (lease != null){ + LOG.info("Freeing lease"); + lease.free(); + } + } catch (StorageException se) { + LOG.warn("Unable to free lease.", se); + } + } + }; + + // Start the background thread and wait for it to signal the lease is held. + t.start(); + try { + leaseAttemptComplete.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + // Try to delete the same file + beginningDeleteAttempt.countDown(); + store.delete(fullKey); + + // At this point file SHOULD BE DELETED + assertFalse(fs.exists(path)); + } + /** * Check that isPageBlobKey works as expected. This assumes that * in the test configuration, the list of supported page blob directories