diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index eaca82e5bb9..dc4959650bf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -2045,10 +2045,10 @@ public PartialListing listAll(String prefix, final int maxListingCount, * The key to search for. * @return The wanted directory, or null if not found. */ - private static FileMetadata getDirectoryInList( + private static FileMetadata getFileMetadataInList( final Iterable list, String key) { for (FileMetadata current : list) { - if (current.isDir() && current.getKey().equals(key)) { + if (current.getKey().equals(key)) { return current; } } @@ -2114,7 +2114,7 @@ private PartialListing list(String prefix, String delimiter, // Add the metadata to the list, but remove any existing duplicate // entries first that we may have added by finding nested files. - FileMetadata existing = getDirectoryInList(fileMetadata, blobKey); + FileMetadata existing = getFileMetadataInList(fileMetadata, blobKey); if (existing != null) { fileMetadata.remove(existing); } @@ -2141,7 +2141,7 @@ private PartialListing list(String prefix, String delimiter, // Add the directory metadata to the list only if it's not already // there. - if (getDirectoryInList(fileMetadata, dirKey) == null) { + if (getFileMetadataInList(fileMetadata, dirKey) == null) { fileMetadata.add(directoryMetadata); } @@ -2249,7 +2249,7 @@ private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory, // Add the directory metadata to the list only if it's not already // there. - FileMetadata existing = getDirectoryInList(aFileMetadataList, blobKey); + FileMetadata existing = getFileMetadataInList(aFileMetadataList, blobKey); if (existing != null) { aFileMetadataList.remove(existing); } @@ -2278,7 +2278,7 @@ private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory, // absolute path is being used or not. String dirKey = normalizeKey(directory); - if (getDirectoryInList(aFileMetadataList, dirKey) == null) { + if (getFileMetadataInList(aFileMetadataList, dirKey) == null) { // Reached the targeted listing depth. Return metadata for the // directory using default permissions. // @@ -2376,18 +2376,24 @@ private void safeDelete(CloudBlobWrapper blob, SelfRenewingLease lease) throws S } } + /** + * API implementation to delete a blob in the back end azure storage. + */ @Override - public void delete(String key, SelfRenewingLease lease) throws IOException { + public boolean delete(String key, SelfRenewingLease lease) throws IOException { try { if (checkContainer(ContainerAccessType.ReadThenWrite) == ContainerState.DoesntExist) { // Container doesn't exist, no need to do anything - return; + return true; } // Get the blob reference and delete it. CloudBlobWrapper blob = getBlobReference(key); if (blob.exists(getInstrumentedContext())) { safeDelete(blob, lease); + return true; + } else { + return false; } } catch (Exception e) { // Re-throw as an Azure storage exception. @@ -2395,10 +2401,13 @@ public void delete(String key, SelfRenewingLease lease) throws IOException { } } + /** + * API implementation to delete a blob in the back end azure storage. + */ @Override - public void delete(String key) throws IOException { + public boolean delete(String key) throws IOException { try { - delete(key, null); + return delete(key, null); } catch (IOException e) { Throwable t = e.getCause(); if(t != null && t instanceof StorageException) { @@ -2407,7 +2416,7 @@ public void delete(String key) throws IOException { SelfRenewingLease lease = null; try { lease = acquireLease(key); - delete(key, lease); + return delete(key, lease); } catch (AzureException e3) { LOG.warn("Got unexpected exception trying to acquire lease on " + key + "." + e3.getMessage()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index fb0d31f330b..161da519618 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -1765,8 +1765,11 @@ public boolean delete(Path f, boolean recursive, } try { - store.delete(key); - instrumentation.fileDeleted(); + if (store.delete(key)) { + instrumentation.fileDeleted(); + } else { + return false; + } } catch(IOException e) { Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); @@ -1885,7 +1888,7 @@ public boolean execute(FileMetadata file) throws IOException{ } // Delete the current directory - if (!deleteFile(metaFile.getKey(), metaFile.isDir())) { + if (store.retrieveMetadata(metaFile.getKey()) != null && !deleteFile(metaFile.getKey(), metaFile.isDir())) { LOG.error("Failed delete directory {}", f.toString()); return false; } @@ -1913,11 +1916,15 @@ public AzureFileSystemThreadPoolExecutor getThreadPoolExecutor(int threadCount, @VisibleForTesting boolean deleteFile(String key, boolean isDir) throws IOException { try { - store.delete(key); - if (isDir) { - instrumentation.directoryDeleted(); + if (store.delete(key)) { + if (isDir) { + instrumentation.directoryDeleted(); + } else { + instrumentation.fileDeleted(); + } + return true; } else { - instrumentation.fileDeleted(); + return false; } } catch(IOException e) { Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); @@ -1929,8 +1936,6 @@ boolean deleteFile(String key, boolean isDir) throws IOException { throw e; } - - return true; } @Override @@ -2790,6 +2795,8 @@ void handleFile(FileMetadata file, FileMetadata tempFile) throws IOException { LOG.debug("Deleting dangling file {}", file.getKey()); + // Not handling delete return type as false return essentially + // means its a no-op for the caller store.delete(file.getKey()); store.delete(tempFile.getKey()); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java index acdd3d6c077..454a5df8658 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -74,7 +74,16 @@ PartialListing listAll(String prefix, final int maxListingCount, void changePermissionStatus(String key, PermissionStatus newPermission) throws AzureException; - void delete(String key) throws IOException; + /** + * API to delete a blob in the back end azure storage. + * @param key - key to the blob being deleted. + * @return return true when delete is successful, false if + * blob cannot be found or delete is not possible without + * exception. + * @throws IOException Exception encountered while deleting in + * azure storage. + */ + boolean delete(String key) throws IOException; void rename(String srcKey, String dstKey) throws IOException; @@ -104,7 +113,17 @@ void updateFolderLastModifiedTime(String key, SelfRenewingLease folderLease) void updateFolderLastModifiedTime(String key, Date lastModified, SelfRenewingLease folderLease) throws AzureException; - void delete(String key, SelfRenewingLease lease) throws IOException; + /** + * API to delete a blob in the back end azure storage. + * @param key - key to the blob being deleted. + * @param lease - Active lease on the blob. + * @return return true when delete is successful, false if + * blob cannot be found or delete is not possible without + * exception. + * @throws IOException Exception encountered while deleting in + * azure storage. + */ + boolean delete(String key, SelfRenewingLease lease) throws IOException; SelfRenewingLease acquireLease(String key) throws AzureException; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemConcurrencyLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemConcurrencyLive.java new file mode 100644 index 00000000000..ec72ccec0a5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemConcurrencyLive.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +/*** + * Test class to hold all Live Azure storage concurrency tests. + */ +public class TestNativeAzureFileSystemConcurrencyLive + extends AbstractWasbTestBase { + + private static final int TEST_COUNT = 102; + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } + + /** + * Test multi-threaded deletes in WASB. Expected behavior is one of the thread + * should be to successfully delete the file and return true and all other + * threads need to return false. + */ + @Test + public void testMultiThreadedDeletes() throws Exception { + Path testFile = new Path("test.dat"); + fs.create(testFile).close(); + + int threadCount = TEST_COUNT; + DeleteHelperThread[] helperThreads = new DeleteHelperThread[threadCount]; + + for (int i = 0; i < threadCount; i++) { + helperThreads[i] = new DeleteHelperThread(fs, testFile); + } + + Thread[] threads = new Thread[threadCount]; + + for (int i = 0; i < threadCount; i++) { + threads[i] = new Thread(helperThreads[i]); + threads[i].start(); + } + + for (int i = 0; i < threadCount; i++) { + threads[i].join(); + } + + boolean deleteSuccess = false; + + for (int i = 0; i < threadCount; i++) { + + Assert.assertFalse("child thread has exception : " + helperThreads[i].getException(), + helperThreads[i].getExceptionEncounteredFlag()); + + if (deleteSuccess) { + Assert.assertFalse("More than one thread delete() retuhelperThreads[i].getDeleteSuccess()", + helperThreads[i].getExceptionEncounteredFlag()); + } else { + deleteSuccess = helperThreads[i].getDeleteSuccess(); + } + } + + Assert.assertTrue("No successfull delete found", deleteSuccess); + } +} + +class DeleteHelperThread implements Runnable { + + private FileSystem fs; + private Path p; + private boolean deleteSuccess; + private boolean exceptionEncountered; + private Exception ex; + + public DeleteHelperThread(FileSystem fs, Path p) { + this.fs = fs; + this.p = p; + } + + public void run() { + try { + deleteSuccess = fs.delete(p, false); + } catch (Exception ioEx) { + exceptionEncountered = true; + this.ex = ioEx; + } + } + + public boolean getDeleteSuccess() { + return deleteSuccess; + } + + public boolean getExceptionEncounteredFlag() { + return exceptionEncountered; + } + + public Exception getException() { + return ex; + } +} \ No newline at end of file