diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureFileSystemThreadPoolExecutor.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureFileSystemThreadPoolExecutor.java new file mode 100644 index 00000000000..a9be8c5a75e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureFileSystemThreadPoolExecutor.java @@ -0,0 +1,346 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.IOException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +class AzureFileSystemThreadPoolExecutor { + + public static final Logger LOG = LoggerFactory.getLogger(AzureFileSystemThreadPoolExecutor.class); + + /* + * Number of threads to keep in the pool. + */ + private int threadCount; + + /* + * Prefix to be used for naming threads. + */ + private String threadNamePrefix; + + /* + * File system operation like delete/rename. Used for logging purpose. + */ + private String operation; + + /* + * Source blob key used for file operation. Used for logging purpose. + */ + private String key; + + /* + * Configuration name for recommendations. Used for logging purpose. + */ + private String config; + + /** + * Creates a new AzureFileSystemThreadPoolExecutor object. + * + * @param threadCount + * Number of threads to be used after reading user configuration. + * @param threadNamePrefix + * Prefix to be used to name threads for the file operation. + * @param operation + * File system operation like delete/rename. Used for logging purpose. + * @param key + * Source blob key used for file operation. Used for logging purpose. + * @param config + * Configuration name for recommendations. Used for logging purpose. + */ + public AzureFileSystemThreadPoolExecutor(int threadCount, String threadNamePrefix, + String operation, String key, String config) { + this.threadCount = threadCount; + this.threadNamePrefix = threadNamePrefix; + this.operation = operation; + this.key = key; + this.config = config; + } + + /** + * Gets a new thread pool + * @param threadCount + * Number of threads to keep in the pool. + * @param threadNamePrefix + * Prefix to be used for naming threads. + * + * @return + * Returns a new thread pool. + */ + @VisibleForTesting + ThreadPoolExecutor getThreadPool(int threadCount) throws Exception { + return new ThreadPoolExecutor(threadCount, threadCount, 2, TimeUnit.SECONDS, + new LinkedBlockingQueue(), new AzureFileSystemThreadFactory(this.threadNamePrefix)); + } + + /** + * Execute the file operation parallel using threads. All threads works on a + * single working set of files stored in input 'contents'. The synchronization + * between multiple threads is achieved through retrieving atomic index value + * from the array. Once thread gets the index, it retrieves the file and initiates + * the file operation. The advantage with this method is that file operations + * doesn't get serialized due to any thread. Also, the input copy is not changed + * such that caller can reuse the list for other purposes. + * + * This implementation also considers that failure of operation on single file + * is considered as overall operation failure. All threads bail out their execution + * as soon as they detect any single thread either got exception or operation is failed. + * + * @param contents + * List of blobs on which operation to be done. + * @param threadOperation + * The actual operation to be executed by each thread on a file. + * + * @param operationStatus + * Returns true if the operation is success, false if operation is failed. + * @throws IOException + * + */ + boolean executeParallel(FileMetadata[] contents, AzureFileSystemThreadTask threadOperation) throws IOException { + + boolean operationStatus = false; + boolean threadsEnabled = false; + int threadCount = this.threadCount; + ThreadPoolExecutor ioThreadPool = null; + + // Start time for file operation + long start = Time.monotonicNow(); + + // If number of files are less then reduce threads to file count. + threadCount = Math.min(contents.length, threadCount); + + if (threadCount > 1) { + try { + ioThreadPool = getThreadPool(threadCount); + threadsEnabled = true; + } catch(Exception e) { + // The possibility of this scenario is very remote. Added this code as safety net. + LOG.warn("Failed to create thread pool with threads {} for operation {} on blob {}." + + " Use config {} to set less number of threads. Setting config value to <= 1 will disable threads.", + threadCount, operation, key, config); + } + } else { + LOG.warn("Disabling threads for {} operation as thread count {} is <= 1", operation, threadCount); + } + + if (threadsEnabled) { + LOG.debug("Using thread pool for {} operation with threads {}", operation, threadCount); + boolean started = false; + AzureFileSystemThreadRunnable runnable = new AzureFileSystemThreadRunnable(contents, threadOperation, operation); + + // Don't start any new requests if there is an exception from any one thread. + for (int i = 0; i < threadCount && runnable.lastException == null && runnable.operationStatus; i++) + { + try { + ioThreadPool.execute(runnable); + started = true; + } catch (RejectedExecutionException ex) { + // If threads can't be scheduled then report error and move ahead with next thread. + // Don't fail operation due to this issue. + LOG.error("Rejected execution of thread for {} operation on blob {}." + + " Continuing with existing threads. Use config {} to set less number of threads" + + " to avoid this error", operation, key, config); + } + } + + // Stop accepting any new execute requests. + ioThreadPool.shutdown(); + + try { + // Wait for threads to terminate. Keep time out as large value + ioThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + } catch(InterruptedException intrEx) { + // If current thread got interrupted then shutdown all threads now. + ioThreadPool.shutdownNow(); + + // Restore the interrupted status + Thread.currentThread().interrupt(); + LOG.error("Threads got interrupted {} blob operation for {} " + , operation, key); + } + + int threadsNotUsed = threadCount - runnable.threadsUsed.get(); + if (threadsNotUsed > 0) { + LOG.warn("{} threads not used for {} operation on blob {}", threadsNotUsed, operation, key); + } + + if (!started) { + // No threads started. Fall back to serial mode. + threadsEnabled = false; + LOG.info("Not able to schedule threads to {} blob {}. Fall back to {} blob serially." + , operation, key, operation); + } else { + IOException lastException = runnable.lastException; + + // There are no exceptions from threads and no operation failures. Consider this scenario + // as failure only if file operations are not done on all files. + if (lastException == null && runnable.operationStatus && runnable.filesProcessed.get() < contents.length) { + LOG.error("{} failed as operation on subfolders and files failed.", operation); + lastException = new IOException(operation + " failed as operation on subfolders and files failed."); + } + + if (lastException != null) { + // Threads started and executed. One or more threads seems to have hit exception. + // Raise the same exception. + throw lastException; + } + + operationStatus = runnable.operationStatus; + } + } + + if (!threadsEnabled) { + // No threads. Serialize the operation. Clear any last exceptions. + LOG.debug("Serializing the {} operation", operation); + for (int i = 0; i < contents.length; i++) { + if (!threadOperation.execute(contents[i])) { + LOG.warn("Failed to {} file {}", operation, contents[i]); + return false; + } + } + + // Operation is success + operationStatus = true; + } + + // Find the duration of time taken for file operation + long end = Time.monotonicNow(); + LOG.info("Time taken for {} operation is: {} ms with threads: {}", operation, (end - start), threadCount); + + return operationStatus; + } + + /** + * A ThreadFactory for Azure File operation threads with meaningful names helpful + * for debugging purposes. + */ + static class AzureFileSystemThreadFactory implements ThreadFactory { + + private String threadIdPrefix = "AzureFileSystemThread"; + + /** + * Atomic integer to provide thread id for thread names. + */ + private AtomicInteger threadSequenceNumber = new AtomicInteger(0); + + public AzureFileSystemThreadFactory(String prefix) { + threadIdPrefix = prefix; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + + // Use current thread name as part in naming thread such that use of + // same file system object will have unique names. + t.setName(String.format("%s-%s-%d", threadIdPrefix, Thread.currentThread().getName(), + threadSequenceNumber.getAndIncrement())); + return t; + } + + } + + static class AzureFileSystemThreadRunnable implements Runnable { + + // Tracks if any thread has raised exception. + private volatile IOException lastException = null; + + // Tracks if any thread has failed execution. + private volatile boolean operationStatus = true; + + // Atomic tracker to retrieve index of next file to be processed + private AtomicInteger fileIndex = new AtomicInteger(0); + + // Atomic tracker to count number of files successfully processed + private AtomicInteger filesProcessed = new AtomicInteger(0); + + // Atomic tracker to retrieve number of threads used to do at least one file operation. + private AtomicInteger threadsUsed = new AtomicInteger(0); + + // Type of file system operation + private String operation = "Unknown"; + + // List of files to be processed. + private final FileMetadata[] files; + + // Thread task which encapsulates the file system operation work on a file. + private AzureFileSystemThreadTask task; + + public AzureFileSystemThreadRunnable(final FileMetadata[] files, + AzureFileSystemThreadTask task, String operation) { + this.operation = operation; + this.files = files; + this.task = task; + } + + @Override + public void run() { + long start = Time.monotonicNow(); + int currentIndex; + int processedFilesCount = 0; + + while ((currentIndex = fileIndex.getAndIncrement()) < files.length) { + processedFilesCount++; + FileMetadata file = files[currentIndex]; + + try { + // Execute the file operation. + if (!task.execute(file)) { + LOG.error("{} operation failed for file {}", + this.operation, file.getKey()); + operationStatus = false; + } else { + filesProcessed.getAndIncrement(); + } + } catch (Exception e) { + LOG.error("Encountered Exception for {} operation for file {}", + this.operation, file.getKey()); + lastException = new IOException("Encountered Exception for " + + this.operation + " operation for file " + file.getKey(), e); + } + + // If any thread has seen exception or operation failed then we + // don't have to process further. + if (lastException != null || !operationStatus) { + LOG.warn("Terminating execution of {} operation now as some other thread" + + " already got exception or operation failed", this.operation, file.getKey()); + break; + } + } + + long end = Time.monotonicNow(); + LOG.debug("Time taken to process {} files count for {} operation: {} ms", + processedFilesCount, this.operation, (end - start)); + if (processedFilesCount > 0) { + threadsUsed.getAndIncrement(); + } + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureFileSystemThreadTask.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureFileSystemThreadTask.java new file mode 100644 index 00000000000..f5180900b9d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureFileSystemThreadTask.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.IOException; + +/** + * Interface for executing the file operation by a thread. + */ +public interface AzureFileSystemThreadTask { + // Execute the operation on the file. + boolean execute(FileMetadata file) throws IOException; +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index cb3e9f65188..eaca82e5bb9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -181,6 +181,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { public static final String KEY_ATOMIC_RENAME_DIRECTORIES = "fs.azure.atomic.rename.dir"; + /** + * Configuration key to enable flat listing of blobs. This config is useful + * only if listing depth is AZURE_UNBOUNDED_DEPTH. + */ + public static final String KEY_ENABLE_FLAT_LISTING = "fs.azure.flatlist.enable"; + /** * The set of directories where we should apply atomic folder rename * synchronized with createNonRecursive. @@ -224,6 +230,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90; + /** + * Enable flat listing of blobs as default option. This is useful only if + * listing depth is AZURE_UNBOUNDED_DEPTH. + */ + public static final boolean DEFAULT_ENABLE_FLAT_LISTING = false; /** * MEMBER VARIABLES @@ -1615,15 +1626,17 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { * @param includeMetadata * if set, the listed items will have their metadata populated * already. - * + * @param useFlatBlobListing + * if set the list is flat, otherwise it is hierarchical. + * * @returns blobItems : iterable collection of blob items. * @throws URISyntaxException * */ - private Iterable listRootBlobs(boolean includeMetadata) - throws StorageException, URISyntaxException { + private Iterable listRootBlobs(boolean includeMetadata, + boolean useFlatBlobListing) throws StorageException, URISyntaxException { return rootDirectory.listBlobs( - null, false, + null, useFlatBlobListing, includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet.noneOf(BlobListingDetails.class), @@ -1643,16 +1656,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { * @param includeMetadata * if set, the listed items will have their metadata populated * already. + * @param useFlatBlobListing + * if set the list is flat, otherwise it is hierarchical. * * @returns blobItems : iterable collection of blob items. * @throws URISyntaxException * */ - private Iterable listRootBlobs(String aPrefix, - boolean includeMetadata) throws StorageException, URISyntaxException { + private Iterable listRootBlobs(String aPrefix, boolean includeMetadata, + boolean useFlatBlobListing) throws StorageException, URISyntaxException { Iterable list = rootDirectory.listBlobs(aPrefix, - false, + useFlatBlobListing, includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet.noneOf(BlobListingDetails.class), @@ -2050,11 +2065,19 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { prefix += PATH_DELIMITER; } + // Enable flat listing option only if depth is unbounded and config + // KEY_ENABLE_FLAT_LISTING is enabled. + boolean enableFlatListing = false; + if (maxListingDepth < 0 && sessionConfiguration.getBoolean( + KEY_ENABLE_FLAT_LISTING, DEFAULT_ENABLE_FLAT_LISTING)) { + enableFlatListing = true; + } + Iterable objects; if (prefix.equals("/")) { - objects = listRootBlobs(true); + objects = listRootBlobs(true, enableFlatListing); } else { - objects = listRootBlobs(prefix, true); + objects = listRootBlobs(prefix, true, enableFlatListing); } ArrayList fileMetadata = new ArrayList(); @@ -2122,10 +2145,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { fileMetadata.add(directoryMetadata); } - // Currently at a depth of one, decrement the listing depth for - // sub-directories. - buildUpList(directory, fileMetadata, maxListingCount, - maxListingDepth - 1); + if (!enableFlatListing) { + // Currently at a depth of one, decrement the listing depth for + // sub-directories. + buildUpList(directory, fileMetadata, maxListingCount, + maxListingDepth - 1); + } } } // Note: Original code indicated that this may be a hack. @@ -2633,7 +2658,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } // Get all blob items with the given prefix from the container and delete // them. - Iterable objects = listRootBlobs(prefix, false); + Iterable objects = listRootBlobs(prefix, false, false); for (ListBlobItem blobItem : objects) { ((CloudBlob) blobItem).delete(DeleteSnapshotsOption.NONE, null, null, getInstrumentedContext()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index a8321436189..fb0d31f330b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -59,23 +59,21 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; -import org.apache.hadoop.fs.azure.AzureException; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.util.Time; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.microsoft.azure.storage.StorageException; - -import org.apache.hadoop.io.IOUtils; - /** * A {@link FileSystem} for reading and writing files stored on Windows Azure. This implementation is @@ -90,6 +88,7 @@ public class NativeAzureFileSystem extends FileSystem { * A description of a folder rename operation, including the source and * destination keys, and descriptions of the files in the source folder. */ + public static class FolderRenamePending { private SelfRenewingLease folderLease; private String srcKey; @@ -112,6 +111,7 @@ public class NativeAzureFileSystem extends FileSystem { ArrayList fileMetadataList = new ArrayList(); // List all the files in the folder. + long start = Time.monotonicNow(); String priorLastKey = null; do { PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL, @@ -122,6 +122,9 @@ public class NativeAzureFileSystem extends FileSystem { priorLastKey = listing.getPriorLastKey(); } while (priorLastKey != null); fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]); + long end = Time.monotonicNow(); + LOG.debug("Time taken to list {} blobs for rename operation is: {} ms", fileMetadata.length, (end - start)); + this.committed = true; } @@ -419,23 +422,18 @@ public class NativeAzureFileSystem extends FileSystem { */ public void execute() throws IOException { - for (FileMetadata file : this.getFiles()) { - - // Rename all materialized entries under the folder to point to the - // final destination. - if (file.getBlobMaterialization() == BlobMaterialization.Explicit) { - String srcName = file.getKey(); - String suffix = srcName.substring((this.getSrcKey()).length()); - String dstName = this.getDstKey() + suffix; - - // Rename gets exclusive access (via a lease) for files - // designated for atomic rename. - // The main use case is for HBase write-ahead log (WAL) and data - // folder processing correctness. See the rename code for details. - boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName); - fs.getStoreInterface().rename(srcName, dstName, acquireLease, null); + AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() { + @Override + public boolean execute(FileMetadata file) throws IOException{ + renameFile(file); + return true; } - } + }; + + AzureFileSystemThreadPoolExecutor executor = this.fs.getThreadPoolExecutor(this.fs.renameThreadCount, + "AzureBlobRenameThread", "Rename", getSrcKey(), AZURE_RENAME_THREADS); + + executor.executeParallel(this.getFiles(), task); // Rename the source folder 0-byte root file itself. FileMetadata srcMetadata2 = this.getSourceMetadata(); @@ -454,6 +452,25 @@ public class NativeAzureFileSystem extends FileSystem { fs.updateParentFolderLastModifiedTime(dstKey); } + // Rename a single file + @VisibleForTesting + void renameFile(FileMetadata file) throws IOException{ + // Rename all materialized entries under the folder to point to the + // final destination. + if (file.getBlobMaterialization() == BlobMaterialization.Explicit) { + String srcName = file.getKey(); + String suffix = srcName.substring((this.getSrcKey()).length()); + String dstName = this.getDstKey() + suffix; + + // Rename gets exclusive access (via a lease) for files + // designated for atomic rename. + // The main use case is for HBase write-ahead log (WAL) and data + // folder processing correctness. See the rename code for details. + boolean acquireLease = this.fs.getStoreInterface().isAtomicRenameKey(srcName); + this.fs.getStoreInterface().rename(srcName, dstName, acquireLease, null); + } + } + /** Clean up after execution of rename. * @throws IOException */ public void cleanup() throws IOException { @@ -662,6 +679,36 @@ public class NativeAzureFileSystem extends FileSystem { */ public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support"; + /** + * The configuration property to set number of threads to be used for rename operation. + */ + public static final String AZURE_RENAME_THREADS = "fs.azure.rename.threads"; + + /** + * The default number of threads to be used for rename operation. + */ + public static final int DEFAULT_AZURE_RENAME_THREADS = 0; + + /** + * The configuration property to set number of threads to be used for delete operation. + */ + public static final String AZURE_DELETE_THREADS = "fs.azure.delete.threads"; + + /** + * The default number of threads to be used for delete operation. + */ + public static final int DEFAULT_AZURE_DELETE_THREADS = 0; + + /** + * The number of threads to be used for delete operation after reading user configuration. + */ + private int deleteThreadCount = 0; + + /** + * The number of threads to be used for rename operation after reading user configuration. + */ + private int renameThreadCount = 0; + private class NativeAzureFsInputStream extends FSInputStream { private InputStream in; private final String key; @@ -1172,6 +1219,9 @@ public class NativeAzureFileSystem extends FileSystem { LOG.debug(" blockSize = {}", conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); + // Initialize thread counts from user configuration + deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS); + renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS); } private NativeFileSystemStore createDefaultStore(Configuration conf) { @@ -1779,77 +1829,65 @@ public class NativeAzureFileSystem extends FileSystem { // List all the blobs in the current folder. String priorLastKey = null; - PartialListing listing = null; - try { - listing = store.listAll(key, AZURE_LIST_ALL, 1, - priorLastKey); - } catch(IOException e) { - Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); + // Start time for list operation + long start = Time.monotonicNow(); + ArrayList fileMetadataList = new ArrayList(); - if (innerException instanceof StorageException - && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { - return false; + // List all the files in the folder with AZURE_UNBOUNDED_DEPTH depth. + do { + try { + PartialListing listing = store.listAll(key, AZURE_LIST_ALL, + AZURE_UNBOUNDED_DEPTH, priorLastKey); + for(FileMetadata file : listing.getFiles()) { + fileMetadataList.add(file); + } + priorLastKey = listing.getPriorLastKey(); + } catch (IOException e) { + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); + + if (innerException instanceof StorageException + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { + return false; + } + + throw e; } + } while (priorLastKey != null); - throw e; + long end = Time.monotonicNow(); + LOG.debug("Time taken to list {} blobs for delete operation: {} ms", fileMetadataList.size(), (end - start)); + + final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]); + + if (!recursive && contents.length > 0) { + // The folder is non-empty and recursive delete was not specified. + // Throw an exception indicating that a non-recursive delete was + // specified for a non-empty folder. + throw new IOException("Non-recursive delete of non-empty directory " + + f.toString()); } - if (listing == null) { + // Delete all files / folders in current directory stored as list in 'contents'. + AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() { + @Override + public boolean execute(FileMetadata file) throws IOException{ + return deleteFile(file.getKey(), file.isDir()); + } + }; + + AzureFileSystemThreadPoolExecutor executor = getThreadPoolExecutor(this.deleteThreadCount, + "AzureBlobDeleteThread", "Delete", key, AZURE_DELETE_THREADS); + + if (!executor.executeParallel(contents, task)) { + LOG.error("Failed to delete files / subfolders in blob {}", key); return false; } - FileMetadata[] contents = listing.getFiles(); - if (!recursive && contents.length > 0) { - // The folder is non-empty and recursive delete was not specified. - // Throw an exception indicating that a non-recursive delete was - // specified for a non-empty folder. - throw new IOException("Non-recursive delete of non-empty directory " - + f.toString()); - } - - // Delete all the files in the folder. - for (FileMetadata p : contents) { - // Tag on the directory name found as the suffix of the suffix of the - // parent directory to get the new absolute path. - String suffix = p.getKey().substring( - p.getKey().lastIndexOf(PATH_DELIMITER)); - if (!p.isDir()) { - try { - store.delete(key + suffix); - instrumentation.fileDeleted(); - } catch(IOException e) { - - Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); - - if (innerException instanceof StorageException - && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { - return false; - } - - throw e; - } - } else { - // Recursively delete contents of the sub-folders. Notice this also - // deletes the blob for the directory. - if (!delete(new Path(f.toString() + suffix), true)) { - return false; - } - } - } - - try { - store.delete(key); - } catch(IOException e) { - - Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); - - if (innerException instanceof StorageException - && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { - return false; - } - - throw e; + // Delete the current directory + if (!deleteFile(metaFile.getKey(), metaFile.isDir())) { + LOG.error("Failed delete directory {}", f.toString()); + return false; } // Update parent directory last modified time @@ -1859,7 +1897,6 @@ public class NativeAzureFileSystem extends FileSystem { updateParentFolderLastModifiedTime(key); } } - instrumentation.directoryDeleted(); } // File or directory was successfully deleted. @@ -1867,6 +1904,35 @@ public class NativeAzureFileSystem extends FileSystem { return true; } + public AzureFileSystemThreadPoolExecutor getThreadPoolExecutor(int threadCount, + String threadNamePrefix, String operation, String key, String config) { + return new AzureFileSystemThreadPoolExecutor(threadCount, threadNamePrefix, operation, key, config); + } + + // Delete single file / directory from key. + @VisibleForTesting + boolean deleteFile(String key, boolean isDir) throws IOException { + try { + store.delete(key); + if (isDir) { + instrumentation.directoryDeleted(); + } else { + instrumentation.fileDeleted(); + } + } catch(IOException e) { + Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); + + if (innerException instanceof StorageException + && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { + return false; + } + + throw e; + } + + return true; + } + @Override public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOException { @@ -2517,7 +2583,8 @@ public class NativeAzureFileSystem extends FileSystem { * @param dstKey Destination folder name. * @throws IOException */ - private FolderRenamePending prepareAtomicFolderRename( + @VisibleForTesting + FolderRenamePending prepareAtomicFolderRename( String srcKey, String dstKey) throws IOException { if (store.isAtomicRenameKey(srcKey)) { diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md index a4a761521d5..7699bfc3faf 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md @@ -24,6 +24,7 @@ * [Atomic Folder Rename](#Atomic_Folder_Rename) * [Accessing wasb URLs](#Accessing_wasb_URLs) * [Append API Support and Configuration](#Append_API_Support_and_Configuration) + * [Multithread Support](#Multithread_Support) * [Testing the hadoop-azure Module](#Testing_the_hadoop-azure_Module) ## Introduction @@ -263,6 +264,24 @@ It becomes a responsibility of the application either to ensure single-threaded file path, or rely on some external locking mechanism of its own. Failure to do so will result in unexpected behavior. +### Multithread Support + +Rename and Delete blob operations on directories with large number of files and sub directories currently is very slow as these operations are done one blob at a time serially. These files and sub folders can be deleted or renamed parallel. Following configurations can be used to enable threads to do parallel processing + +To enable 10 threads for Delete operation. Set configuration value to 0 or 1 to disable threads. The default behavior is threads disabled. + + + fs.azure.delete.threads + 10 + + +To enable 20 threads for Rename operation. Set configuration value to 0 or 1 to disable threads. The default behavior is threads disabled. + + + fs.azure.rename.threads + 20 + + ## Testing the hadoop-azure Module The hadoop-azure module includes a full suite of unit tests. Most of the tests diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsWithThreads.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsWithThreads.java new file mode 100644 index 00000000000..64316f66576 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsWithThreads.java @@ -0,0 +1,720 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.apache.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; + +/** + * Tests the Native Azure file system (WASB) using parallel threads for rename and delete operations. + */ +public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase { + + private final int renameThreads = 10; + private final int deleteThreads = 20; + private int iterations = 1; + private LogCapturer logs = null; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = fs.getConf(); + + // By default enable parallel threads for rename and delete operations. + // Also enable flat listing of blobs for these operations. + conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, renameThreads); + conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, deleteThreads); + conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, true); + + URI uri = fs.getUri(); + fs.initialize(uri, conf); + + // Capture logs + logs = LogCapturer.captureLogs(new Log4JLogger(Logger + .getRootLogger())); + } + + /* + * Helper method to create sub directory and different types of files + * for multiple iterations. + */ + private void createFolder(FileSystem fs, String root) throws Exception { + fs.mkdirs(new Path(root)); + for (int i = 0; i < this.iterations; i++) { + fs.mkdirs(new Path(root + "/" + i)); + fs.createNewFile(new Path(root + "/" + i + "/fileToRename")); + fs.createNewFile(new Path(root + "/" + i + "/file/to/rename")); + fs.createNewFile(new Path(root + "/" + i + "/file+to%rename")); + fs.createNewFile(new Path(root + "/fileToRename" + i)); + } + } + + /* + * Helper method to do rename operation and validate all files in source folder + * doesn't exists and similar files exists in new folder. + */ + private void validateRenameFolder(FileSystem fs, String source, String dest) throws Exception { + // Create source folder with files. + createFolder(fs, source); + Path sourceFolder = new Path(source); + Path destFolder = new Path(dest); + + // rename operation + assertTrue(fs.rename(sourceFolder, destFolder)); + assertTrue(fs.exists(destFolder)); + + for (int i = 0; i < this.iterations; i++) { + // Check destination folder and files exists. + assertTrue(fs.exists(new Path(dest + "/" + i))); + assertTrue(fs.exists(new Path(dest + "/" + i + "/fileToRename"))); + assertTrue(fs.exists(new Path(dest + "/" + i + "/file/to/rename"))); + assertTrue(fs.exists(new Path(dest + "/" + i + "/file+to%rename"))); + assertTrue(fs.exists(new Path(dest + "/fileToRename" + i))); + + // Check source folder and files doesn't exists. + assertFalse(fs.exists(new Path(source + "/" + i))); + assertFalse(fs.exists(new Path(source + "/" + i + "/fileToRename"))); + assertFalse(fs.exists(new Path(source + "/" + i + "/file/to/rename"))); + assertFalse(fs.exists(new Path(source + "/" + i + "/file+to%rename"))); + assertFalse(fs.exists(new Path(source + "/fileToRename" + i))); + } + } + + /* + * Test case for rename operation with multiple threads and flat listing enabled. + */ + @Test + public void testRenameSmallFolderWithThreads() throws Exception { + + validateRenameFolder(fs, "root", "rootnew"); + + // With single iteration, we would have created 7 blobs. + int expectedThreadsCreated = Math.min(7, renameThreads); + + // Validate from logs that threads are created. + String content = logs.getOutput(); + assertTrue(content.contains("ms with threads: " + expectedThreadsCreated)); + + // Validate thread executions + for (int i = 0; i < expectedThreadsCreated; i++) { + assertTrue(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i)); + } + + // Also ensure that we haven't spawned extra threads. + if (expectedThreadsCreated < renameThreads) { + for (int i = expectedThreadsCreated; i < renameThreads; i++) { + assertFalse(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i)); + } + } + } + + /* + * Test case for rename operation with multiple threads and flat listing enabled. + */ + @Test + public void testRenameLargeFolderWithThreads() throws Exception { + + // Populate source folder with large number of files and directories. + this.iterations = 10; + validateRenameFolder(fs, "root", "rootnew"); + + // Validate from logs that threads are created. + String content = logs.getOutput(); + assertTrue(content.contains("ms with threads: " + renameThreads)); + + // Validate thread executions + for (int i = 0; i < renameThreads; i++) { + assertTrue(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i)); + } + } + + /* + * Test case for rename operation with threads disabled and flat listing enabled. + */ + @Test + public void testRenameLargeFolderDisableThreads() throws Exception { + Configuration conf = fs.getConf(); + + // Number of threads set to 0 or 1 disables threads. + conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, 0); + URI uri = fs.getUri(); + fs.initialize(uri, conf); + + // Populate source folder with large number of files and directories. + this.iterations = 10; + validateRenameFolder(fs, "root", "rootnew"); + + // Validate from logs that threads are disabled. + String content = logs.getOutput(); + assertTrue(content.contains("Disabling threads for Rename operation as thread count 0")); + + // Validate no thread executions + for (int i = 0; i < renameThreads; i++) { + assertFalse(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i)); + } + } + + /* + * Test case for rename operation with threads and flat listing disabled. + */ + @Test + public void testRenameSmallFolderDisableThreadsDisableFlatListing() throws Exception { + Configuration conf = fs.getConf(); + conf = fs.getConf(); + + // Number of threads set to 0 or 1 disables threads. + conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, 1); + conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, false); + URI uri = fs.getUri(); + fs.initialize(uri, conf); + + validateRenameFolder(fs, "root", "rootnew"); + + // Validate from logs that threads are disabled. + String content = logs.getOutput(); + assertTrue(content.contains("Disabling threads for Rename operation as thread count 1")); + + // Validate no thread executions + for (int i = 0; i < renameThreads; i++) { + assertFalse(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i)); + } + } + + /* + * Helper method to do delete operation and validate all files in source folder + * doesn't exists after delete operation. + */ + private void validateDeleteFolder(FileSystem fs, String source) throws Exception { + // Create folder with files. + createFolder(fs, "root"); + Path sourceFolder = new Path(source); + + // Delete operation + assertTrue(fs.delete(sourceFolder, true)); + assertFalse(fs.exists(sourceFolder)); + + for (int i = 0; i < this.iterations; i++) { + // check that source folder and files doesn't exists + assertFalse(fs.exists(new Path(source + "/" + i))); + assertFalse(fs.exists(new Path(source + "/" + i + "/fileToRename"))); + assertFalse(fs.exists(new Path(source + "/" + i + "/file/to/rename"))); + assertFalse(fs.exists(new Path(source + "/" + i + "/file+to%rename"))); + assertFalse(fs.exists(new Path(source + "/fileToRename" + i))); + } + } + + /* + * Test case for delete operation with multiple threads and flat listing enabled. + */ + @Test + public void testDeleteSmallFolderWithThreads() throws Exception { + + validateDeleteFolder(fs, "root"); + + // With single iteration, we would have created 7 blobs. + int expectedThreadsCreated = Math.min(7, deleteThreads); + + // Validate from logs that threads are enabled. + String content = logs.getOutput(); + assertTrue(content.contains("ms with threads: " + expectedThreadsCreated)); + + // Validate thread executions + for (int i = 0; i < expectedThreadsCreated; i++) { + assertTrue(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i)); + } + + // Also ensure that we haven't spawned extra threads. + if (expectedThreadsCreated < deleteThreads) { + for (int i = expectedThreadsCreated; i < deleteThreads; i++) { + assertFalse(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i)); + } + } + } + + /* + * Test case for delete operation with multiple threads and flat listing enabled. + */ + @Test + public void testDeleteLargeFolderWithThreads() throws Exception { + // Populate source folder with large number of files and directories. + this.iterations = 10; + validateDeleteFolder(fs, "root"); + + // Validate from logs that threads are enabled. + String content = logs.getOutput(); + assertTrue(content.contains("ms with threads: " + deleteThreads)); + + // Validate thread executions + for (int i = 0; i < deleteThreads; i++) { + assertTrue(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i)); + } + } + + /* + * Test case for delete operation with threads disabled and flat listing enabled. + */ + @Test + public void testDeleteLargeFolderDisableThreads() throws Exception { + Configuration conf = fs.getConf(); + conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, 0); + URI uri = fs.getUri(); + fs.initialize(uri, conf); + + // Populate source folder with large number of files and directories. + this.iterations = 10; + validateDeleteFolder(fs, "root"); + + // Validate from logs that threads are disabled. + String content = logs.getOutput(); + assertTrue(content.contains("Disabling threads for Delete operation as thread count 0")); + + // Validate no thread executions + for (int i = 0; i < deleteThreads; i++) { + assertFalse(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i)); + } + } + + /* + * Test case for rename operation with threads and flat listing disabled. + */ + @Test + public void testDeleteSmallFolderDisableThreadsDisableFlatListing() throws Exception { + Configuration conf = fs.getConf(); + + // Number of threads set to 0 or 1 disables threads. + conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, 1); + conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, false); + URI uri = fs.getUri(); + fs.initialize(uri, conf); + + validateDeleteFolder(fs, "root"); + + // Validate from logs that threads are disabled. + String content = logs.getOutput(); + assertTrue(content.contains("Disabling threads for Delete operation as thread count 1")); + + // Validate no thread executions + for (int i = 0; i < deleteThreads; i++) { + assertFalse(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i)); + } + } + + /* + * Test case for delete operation with multiple threads and flat listing enabled. + */ + @Test + public void testDeleteThreadPoolExceptionFailure() throws Exception { + + // Spy azure file system object and raise exception for new thread pool + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root"))); + + AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy( + mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete", + path, NativeAzureFileSystem.AZURE_DELETE_THREADS)); + Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenThrow(new Exception()); + + // With single iteration, we would have created 7 blobs resulting 7 threads. + Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete", + path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor); + + validateDeleteFolder(mockFs, "root"); + + // Validate from logs that threads are disabled. + String content = logs.getOutput(); + assertTrue(content.contains("Failed to create thread pool with threads")); + assertTrue(content.contains("Serializing the Delete operation")); + } + + /* + * Test case for delete operation with multiple threads and flat listing enabled. + */ + @Test + public void testDeleteThreadPoolExecuteFailure() throws Exception { + + // Mock thread pool executor to throw exception for all requests. + ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class); + Mockito.doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class)); + + // Spy azure file system object and return mocked thread pool + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root"))); + + AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy( + mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete", + path, NativeAzureFileSystem.AZURE_DELETE_THREADS)); + Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor); + + // With single iteration, we would have created 7 blobs resulting 7 threads. + Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete", + path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor); + + validateDeleteFolder(mockFs, "root"); + + // Validate from logs that threads are disabled. + String content = logs.getOutput(); + assertTrue(content.contains("Rejected execution of thread for Delete operation on blob")); + assertTrue(content.contains("Serializing the Delete operation")); + } + + /* + * Test case for delete operation with multiple threads and flat listing enabled. + */ + @Test + public void testDeleteThreadPoolExecuteSingleThreadFailure() throws Exception { + + // Spy azure file system object and return mocked thread pool + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + + // Spy a thread pool executor and link it to azure file system object. + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root"))); + AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy( + mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete", + path, NativeAzureFileSystem.AZURE_DELETE_THREADS)); + + // With single iteration, we would have created 7 blobs resulting 7 threads. + Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete", + path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor); + + // Create a thread executor and link it to mocked thread pool executor object. + ThreadPoolExecutor mockThreadExecutor = Mockito.spy(mockThreadPoolExecutor.getThreadPool(7)); + Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor); + + // Mock thread executor to throw exception for all requests. + Mockito.doCallRealMethod().doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class)); + + validateDeleteFolder(mockFs, "root"); + + // Validate from logs that threads are enabled and unused threads. + String content = logs.getOutput(); + assertTrue(content.contains("Using thread pool for Delete operation with threads 7")); + assertTrue(content.contains("6 threads not used for Delete operation on blob")); + } + + /* + * Test case for delete operation with multiple threads and flat listing enabled. + */ + @Test + public void testDeleteThreadPoolTerminationFailure() throws Exception { + + // Spy azure file system object and return mocked thread pool + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + + // Spy a thread pool executor and link it to azure file system object. + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root"))); + AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy( + ((NativeAzureFileSystem) fs).getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete", + path, NativeAzureFileSystem.AZURE_DELETE_THREADS)); + + // Create a thread executor and link it to mocked thread pool executor object. + // Mock thread executor to throw exception for terminating threads. + ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class); + Mockito.doNothing().when(mockThreadExecutor).execute(Mockito.any(Runnable.class)); + Mockito.when(mockThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)).thenThrow(new InterruptedException()); + + Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor); + + // With single iteration, we would have created 7 blobs resulting 7 threads. + Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete", + path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor); + + createFolder(mockFs, "root"); + Path sourceFolder = new Path("root"); + boolean exception = false; + try { + mockFs.delete(sourceFolder, true); + } catch (IOException e){ + exception = true; + } + + assertTrue(exception); + assertTrue(mockFs.exists(sourceFolder)); + + // Validate from logs that threads are enabled and delete operation is failed. + String content = logs.getOutput(); + assertTrue(content.contains("Using thread pool for Delete operation with threads")); + assertTrue(content.contains("Threads got interrupted Delete blob operation")); + assertTrue(content.contains("Delete failed as operation on subfolders and files failed.")); + } + + /* + * Test case for delete operation with multiple threads and flat listing enabled. + */ + @Test + public void testDeleteSingleDeleteFailure() throws Exception { + + // Spy azure file system object and return false for deleting one file + LOG.info("testDeleteSingleDeleteFailure"); + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0"))); + Mockito.when(mockFs.deleteFile(path, true)).thenReturn(false); + + createFolder(mockFs, "root"); + Path sourceFolder = new Path("root"); + assertFalse(mockFs.delete(sourceFolder, true)); + assertTrue(mockFs.exists(sourceFolder)); + + // Validate from logs that threads are enabled and delete operation failed. + String content = logs.getOutput(); + assertTrue(content.contains("Using thread pool for Delete operation with threads")); + assertTrue(content.contains("Delete operation failed for file " + path)); + assertTrue(content.contains("Terminating execution of Delete operation now as some other thread already got exception or operation failed")); + assertTrue(content.contains("Failed to delete files / subfolders in blob")); + } + + /* + * Test case for delete operation with multiple threads and flat listing enabled. + */ + @Test + public void testDeleteSingleDeleteException() throws Exception { + + // Spy azure file system object and raise exception for deleting one file + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0"))); + Mockito.doThrow(new IOException()).when(mockFs).deleteFile(path, true); + + createFolder(mockFs, "root"); + Path sourceFolder = new Path("root"); + + boolean exception = false; + try { + mockFs.delete(sourceFolder, true); + } catch (IOException e){ + exception = true; + } + + assertTrue(exception); + assertTrue(mockFs.exists(sourceFolder)); + + // Validate from logs that threads are enabled and delete operation failed. + String content = logs.getOutput(); + assertTrue(content.contains("Using thread pool for Delete operation with threads")); + assertTrue(content.contains("Encountered Exception for Delete operation for file " + path)); + assertTrue(content.contains("Terminating execution of Delete operation now as some other thread already got exception or operation failed")); + } + + /* + * Test case for rename operation with multiple threads and flat listing enabled. + */ + @Test + public void testRenameThreadPoolExceptionFailure() throws Exception { + + // Spy azure file system object and raise exception for new thread pool + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root"))); + AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy( + ((NativeAzureFileSystem) fs).getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename", + path, NativeAzureFileSystem.AZURE_RENAME_THREADS)); + Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenThrow(new Exception()); + + // With single iteration, we would have created 7 blobs resulting 7 threads. + Mockito.doReturn(mockThreadPoolExecutor).when(mockFs).getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename", + path, NativeAzureFileSystem.AZURE_RENAME_THREADS); + + validateRenameFolder(mockFs, "root", "rootnew"); + + // Validate from logs that threads are disabled. + String content = logs.getOutput(); + assertTrue(content.contains("Failed to create thread pool with threads")); + assertTrue(content.contains("Serializing the Rename operation")); + } + + /* + * Test case for rename operation with multiple threads and flat listing enabled. + */ + @Test + public void testRenameThreadPoolExecuteFailure() throws Exception { + + // Mock thread pool executor to throw exception for all requests. + ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class); + Mockito.doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class)); + + // Spy azure file system object and return mocked thread pool + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root"))); + AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy( + mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename", + path, NativeAzureFileSystem.AZURE_RENAME_THREADS)); + Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor); + + // With single iteration, we would have created 7 blobs resulting 7 threads. + Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename", + path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor); + + validateRenameFolder(mockFs, "root", "rootnew"); + + // Validate from logs that threads are disabled. + String content = logs.getOutput(); + assertTrue(content.contains("Rejected execution of thread for Rename operation on blob")); + assertTrue(content.contains("Serializing the Rename operation")); + } + + /* + * Test case for rename operation with multiple threads and flat listing enabled. + */ + @Test + public void testRenameThreadPoolExecuteSingleThreadFailure() throws Exception { + + // Spy azure file system object and return mocked thread pool + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + + // Spy a thread pool executor and link it to azure file system object. + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root"))); + AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy( + mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename", + path, NativeAzureFileSystem.AZURE_RENAME_THREADS)); + + // With single iteration, we would have created 7 blobs resulting 7 threads. + Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename", + path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor); + + // Create a thread executor and link it to mocked thread pool executor object. + ThreadPoolExecutor mockThreadExecutor = Mockito.spy(mockThreadPoolExecutor.getThreadPool(7)); + Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor); + + // Mock thread executor to throw exception for all requests. + Mockito.doCallRealMethod().doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class)); + + validateRenameFolder(mockFs, "root", "rootnew"); + + // Validate from logs that threads are enabled and unused threads exists. + String content = logs.getOutput(); + assertTrue(content.contains("Using thread pool for Rename operation with threads 7")); + assertTrue(content.contains("6 threads not used for Rename operation on blob")); + } + + /* + * Test case for rename operation with multiple threads and flat listing enabled. + */ + @Test + public void testRenameThreadPoolTerminationFailure() throws Exception { + + // Spy azure file system object and return mocked thread pool + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + + // Spy a thread pool executor and link it to azure file system object. + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root"))); + AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy( + mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename", + path, NativeAzureFileSystem.AZURE_RENAME_THREADS)); + + // With single iteration, we would have created 7 blobs resulting 7 threads. + Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename", + path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor); + + // Mock thread executor to throw exception for all requests. + ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class); + Mockito.doNothing().when(mockThreadExecutor).execute(Mockito.any(Runnable.class)); + Mockito.when(mockThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)).thenThrow(new InterruptedException()); + Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor); + + + createFolder(mockFs, "root"); + Path sourceFolder = new Path("root"); + Path destFolder = new Path("rootnew"); + boolean exception = false; + try { + mockFs.rename(sourceFolder, destFolder); + } catch (IOException e){ + exception = true; + } + + assertTrue(exception); + assertTrue(mockFs.exists(sourceFolder)); + + // Validate from logs that threads are enabled and rename operation is failed. + String content = logs.getOutput(); + assertTrue(content.contains("Using thread pool for Rename operation with threads")); + assertTrue(content.contains("Threads got interrupted Rename blob operation")); + assertTrue(content.contains("Rename failed as operation on subfolders and files failed.")); + } + + /* + * Test case for rename operation with multiple threads and flat listing enabled. + */ + @Test + public void testRenameSingleRenameException() throws Exception { + + // Spy azure file system object and raise exception for deleting one file + Path sourceFolder = new Path("root"); + Path destFolder = new Path("rootnew"); + + // Spy azure file system object and populate rename pending spy object. + NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs); + + // Populate data now only such that rename pending spy object would see this data. + createFolder(mockFs, "root"); + + String srcKey = mockFs.pathToKey(mockFs.makeAbsolute(sourceFolder)); + String dstKey = mockFs.pathToKey(mockFs.makeAbsolute(destFolder)); + + FolderRenamePending mockRenameFs = Mockito.spy(mockFs.prepareAtomicFolderRename(srcKey, dstKey)); + Mockito.when(mockFs.prepareAtomicFolderRename(srcKey, dstKey)).thenReturn(mockRenameFs); + String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0"))); + Mockito.doThrow(new IOException()).when(mockRenameFs).renameFile(Mockito.any(FileMetadata.class)); + + boolean exception = false; + try { + mockFs.rename(sourceFolder, destFolder); + } catch (IOException e){ + exception = true; + } + + assertTrue(exception); + assertTrue(mockFs.exists(sourceFolder)); + + // Validate from logs that threads are enabled and delete operation failed. + String content = logs.getOutput(); + assertTrue(content.contains("Using thread pool for Rename operation with threads")); + assertTrue(content.contains("Encountered Exception for Rename operation for file " + path)); + assertTrue(content.contains("Terminating execution of Rename operation now as some other thread already got exception or operation failed")); + } + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties index bb5cbe5ec32..73ee3f9c6ce 100644 --- a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties @@ -20,4 +20,6 @@ log4j.rootLogger=INFO,stdout log4j.threshold=ALL log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n + +log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG