From 9374f3882044b552b7dbde788ce569452072c6dc Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 29 Aug 2017 19:03:44 +0100 Subject: [PATCH] HADOOP-14583. wasb throws an exception if you try to create a file and there's no parent directory Contributed by Esfandiar Manii and Thomas Marquardt. --- .../fs/azure/AzureNativeFileSystemStore.java | 36 ++-- ...tNativeAzureFileSystemConcurrencyLive.java | 173 ++++++++++++------ 2 files changed, 140 insertions(+), 69 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index b0cd701b9e2..bd8ac686e3d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -2027,24 +2027,30 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { LOG.debug("Found {} as an explicit blob. Checking if it's a file or folder.", key); - // The blob exists, so capture the metadata from the blob - // properties. - blob.downloadAttributes(getInstrumentedContext()); - BlobProperties properties = blob.getProperties(); + try { + // The blob exists, so capture the metadata from the blob + // properties. + blob.downloadAttributes(getInstrumentedContext()); + BlobProperties properties = blob.getProperties(); - if (retrieveFolderAttribute(blob)) { - LOG.debug("{} is a folder blob.", key); - return new FileMetadata(key, properties.getLastModified().getTime(), - getPermissionStatus(blob), BlobMaterialization.Explicit); - } else { + if (retrieveFolderAttribute(blob)) { + LOG.debug("{} is a folder blob.", key); + return new FileMetadata(key, properties.getLastModified().getTime(), + getPermissionStatus(blob), BlobMaterialization.Explicit); + } else { - LOG.debug("{} is a normal blob.", key); + LOG.debug("{} is a normal blob.", key); - return new FileMetadata( - key, // Always return denormalized key with metadata. - getDataLength(blob, properties), - properties.getLastModified().getTime(), - getPermissionStatus(blob)); + return new FileMetadata( + key, // Always return denormalized key with metadata. + getDataLength(blob, properties), + properties.getLastModified().getTime(), + getPermissionStatus(blob)); + } + } catch(StorageException e){ + if (!NativeAzureFileSystemHelper.isFileNotFoundException(e)) { + throw e; + } } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemConcurrencyLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemConcurrencyLive.java index ec72ccec0a5..7c5899d66bf 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemConcurrencyLive.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemConcurrencyLive.java @@ -19,101 +19,166 @@ package org.apache.hadoop.fs.azure; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + /*** * Test class to hold all Live Azure storage concurrency tests. */ public class TestNativeAzureFileSystemConcurrencyLive extends AbstractWasbTestBase { - private static final int TEST_COUNT = 102; + private static final int THREAD_COUNT = 102; + private static final int TEST_EXECUTION_TIMEOUT = 5000; @Override protected AzureBlobStorageTestAccount createTestAccount() throws Exception { return AzureBlobStorageTestAccount.create(); } /** - * Test multi-threaded deletes in WASB. Expected behavior is one of the thread - * should be to successfully delete the file and return true and all other - * threads need to return false. + * Validate contract for FileSystem.create when overwrite is true and there + * are concurrent callers of FileSystem.delete. An existing file should be + * overwritten, even if the original destination exists but is deleted by an + * external agent during the create operation. */ - @Test - public void testMultiThreadedDeletes() throws Exception { + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testConcurrentCreateDeleteFile() throws Exception { + Path testFile = new Path("test.dat"); + + List tasks = new ArrayList<>(THREAD_COUNT); + + for (int i = 0; i < THREAD_COUNT; i++) { + tasks.add(new CreateFileTask(fs, testFile)); + } + + ExecutorService es = null; + + try { + es = Executors.newFixedThreadPool(THREAD_COUNT); + + List> futures = es.invokeAll(tasks); + + for (Future future : futures) { + Assert.assertTrue(future.isDone()); + + // we are using Callable, so if an exception + // occurred during the operation, it will be thrown + // when we call get + Assert.assertEquals(null, future.get()); + } + } finally { + if (es != null) { + es.shutdownNow(); + } + } + } + + /** + * Validate contract for FileSystem.delete when invoked concurrently. + * One of the threads should successfully delete the file and return true; + * all other threads should return false. + */ + @Test(timeout = TEST_EXECUTION_TIMEOUT) + public void testConcurrentDeleteFile() throws Exception { Path testFile = new Path("test.dat"); fs.create(testFile).close(); - int threadCount = TEST_COUNT; - DeleteHelperThread[] helperThreads = new DeleteHelperThread[threadCount]; + List tasks = new ArrayList<>(THREAD_COUNT); - for (int i = 0; i < threadCount; i++) { - helperThreads[i] = new DeleteHelperThread(fs, testFile); + for (int i = 0; i < THREAD_COUNT; i++) { + tasks.add(new DeleteFileTask(fs, testFile)); } - Thread[] threads = new Thread[threadCount]; + ExecutorService es = null; + try { + es = Executors.newFixedThreadPool(THREAD_COUNT); - for (int i = 0; i < threadCount; i++) { - threads[i] = new Thread(helperThreads[i]); - threads[i].start(); - } + List> futures = es.invokeAll(tasks); - for (int i = 0; i < threadCount; i++) { - threads[i].join(); - } + int successCount = 0; + for (Future future : futures) { + Assert.assertTrue(future.isDone()); - boolean deleteSuccess = false; + // we are using Callable, so if an exception + // occurred during the operation, it will be thrown + // when we call get + Boolean success = future.get(); + if (success) { + successCount++; + } + } - for (int i = 0; i < threadCount; i++) { - - Assert.assertFalse("child thread has exception : " + helperThreads[i].getException(), - helperThreads[i].getExceptionEncounteredFlag()); - - if (deleteSuccess) { - Assert.assertFalse("More than one thread delete() retuhelperThreads[i].getDeleteSuccess()", - helperThreads[i].getExceptionEncounteredFlag()); - } else { - deleteSuccess = helperThreads[i].getDeleteSuccess(); + Assert.assertEquals( + "Exactly one delete operation should return true.", + 1, + successCount); + } finally { + if (es != null) { + es.shutdownNow(); } } - - Assert.assertTrue("No successfull delete found", deleteSuccess); } } -class DeleteHelperThread implements Runnable { +abstract class FileSystemTask implements Callable { + private final FileSystem fileSystem; + private final Path path; - private FileSystem fs; - private Path p; - private boolean deleteSuccess; - private boolean exceptionEncountered; - private Exception ex; - - public DeleteHelperThread(FileSystem fs, Path p) { - this.fs = fs; - this.p = p; + protected FileSystem getFileSystem() { + return this.fileSystem; } - public void run() { - try { - deleteSuccess = fs.delete(p, false); - } catch (Exception ioEx) { - exceptionEncountered = true; - this.ex = ioEx; - } + protected Path getFilePath() { + return this.path; } - public boolean getDeleteSuccess() { - return deleteSuccess; + FileSystemTask(FileSystem fs, Path p) { + this.fileSystem = fs; + this.path = p; } - public boolean getExceptionEncounteredFlag() { - return exceptionEncountered; + public abstract V call() throws Exception; +} + +class DeleteFileTask extends FileSystemTask { + + DeleteFileTask(FileSystem fs, Path p) { + super(fs, p); } - public Exception getException() { - return ex; + @Override + public Boolean call() throws Exception { + return this.getFileSystem().delete(this.getFilePath(), false); + } +} + +class CreateFileTask extends FileSystemTask { + CreateFileTask(FileSystem fs, Path p) { + super(fs, p); + } + + public Void call() throws Exception { + FileSystem fs = getFileSystem(); + Path p = getFilePath(); + + // Create an empty file and close the stream. + FSDataOutputStream stream = fs.create(p, true); + stream.close(); + + // Delete the file. We don't care if delete returns true or false. + // We just want to ensure the file does not exist. + this.getFileSystem().delete(this.getFilePath(), false); + + return null; } } \ No newline at end of file