From 45d9568aaaf532a6da11bd7c1844ff81bf66bab1 Mon Sep 17 00:00:00 2001
From: Steve Loughran
Date: Thu, 19 Jul 2018 12:31:19 -0700
Subject: [PATCH] HADOOP-15547/ WASB: improve listStatus performance.
Contributed by Thomas Marquardt.
(cherry picked from commit 749fff577ed9afb4ef8a54b8948f74be083cc620)
---
.../dev-support/findbugs-exclude.xml | 10 +
hadoop-tools/hadoop-azure/pom.xml | 12 +
.../fs/azure/AzureNativeFileSystemStore.java | 182 +++++----
.../apache/hadoop/fs/azure/FileMetadata.java | 77 ++--
.../fs/azure/NativeAzureFileSystem.java | 376 +++++++-----------
.../fs/azure/NativeFileSystemStore.java | 15 +-
.../hadoop/fs/azure/PartialListing.java | 61 ---
.../hadoop/fs/azure/ITestListPerformance.java | 196 +++++++++
8 files changed, 514 insertions(+), 415 deletions(-)
delete mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestListPerformance.java
diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
index cde1734dbc1..38de35e897a 100644
--- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
@@ -47,4 +47,14 @@
+
+
+
+
+
+
diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index 44b67a0b063..52b5b726a13 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -43,6 +43,8 @@
unset
7200
+ 10
+ 1000
@@ -298,6 +300,8 @@
${fs.azure.scale.test.huge.filesize}
${fs.azure.scale.test.huge.partitionsize}
${fs.azure.scale.test.timeout}
+ ${fs.azure.scale.test.list.performance.threads}
+ ${fs.azure.scale.test.list.performance.files}
**/Test*.java
@@ -326,6 +330,8 @@
${fs.azure.scale.test.huge.filesize}
${fs.azure.scale.test.huge.partitionsize}
${fs.azure.scale.test.timeout}
+ ${fs.azure.scale.test.list.performance.threads}
+ ${fs.azure.scale.test.list.performance.files}
**/TestRollingWindowAverage*.java
@@ -367,6 +373,8 @@
${fs.azure.scale.test.huge.filesize}
${fs.azure.scale.test.huge.partitionsize}
${fs.azure.scale.test.timeout}
+ ${fs.azure.scale.test.list.performance.threads}
+ ${fs.azure.scale.test.list.performance.files}
@@ -412,6 +420,8 @@
${fs.azure.scale.test.huge.filesize}
${fs.azure.scale.test.huge.partitionsize}
${fs.azure.scale.test.timeout}
+ ${fs.azure.scale.test.list.performance.threads}
+ ${fs.azure.scale.test.list.performance.files}
**/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java
@@ -454,6 +464,8 @@
${fs.azure.scale.test.enabled}
${fs.azure.scale.test.huge.filesize}
${fs.azure.scale.test.timeout}
+ ${fs.azure.scale.test.list.performance.threads}
+ ${fs.azure.scale.test.list.performance.files}
${fs.azure.scale.test.timeout}
false
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 197ab22be21..d2f9ca69947 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -30,7 +30,6 @@ import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.security.InvalidKeyException;
-import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.EnumSet;
@@ -128,6 +127,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// computed as min(2*cpu,8)
private static final String KEY_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
+ private static final String HADOOP_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
private static final String KEY_STREAM_MIN_READ_SIZE = "fs.azure.read.request.size";
private static final String KEY_STORAGE_CONNECTION_TIMEOUT = "fs.azure.storage.timeout";
private static final String KEY_WRITE_BLOCK_SIZE = "fs.azure.write.request.size";
@@ -252,6 +252,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// Default block sizes
public static final int DEFAULT_DOWNLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
+ public static final long DEFAULT_HADOOP_BLOCK_SIZE = 512 * 1024 * 1024L;
private static final int DEFAULT_INPUT_STREAM_VERSION = 2;
@@ -313,6 +314,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private boolean tolerateOobAppends = DEFAULT_READ_TOLERATE_CONCURRENT_APPEND;
+ private long hadoopBlockSize = DEFAULT_HADOOP_BLOCK_SIZE;
private int downloadBlockSizeBytes = DEFAULT_DOWNLOAD_BLOCK_SIZE;
private int uploadBlockSizeBytes = DEFAULT_UPLOAD_BLOCK_SIZE;
private int inputStreamVersion = DEFAULT_INPUT_STREAM_VERSION;
@@ -740,6 +742,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
KEY_STREAM_MIN_READ_SIZE, DEFAULT_DOWNLOAD_BLOCK_SIZE);
this.uploadBlockSizeBytes = sessionConfiguration.getInt(
KEY_WRITE_BLOCK_SIZE, DEFAULT_UPLOAD_BLOCK_SIZE);
+ this.hadoopBlockSize = sessionConfiguration.getLong(
+ HADOOP_BLOCK_SIZE_PROPERTY_NAME, DEFAULT_HADOOP_BLOCK_SIZE);
this.inputStreamVersion = sessionConfiguration.getInt(
KEY_INPUT_STREAM_VERSION, DEFAULT_INPUT_STREAM_VERSION);
@@ -1234,7 +1238,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
return false;
}
-
+ /**
+ * Returns the file block size. This is a fake value used for integration
+ * of the Azure store with Hadoop.
+ */
+ @Override
+ public long getHadoopBlockSize() {
+ return hadoopBlockSize;
+ }
/**
* This should be called from any method that does any modifications to the
@@ -2066,7 +2077,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// The key refers to root directory of container.
// Set the modification time for root to zero.
return new FileMetadata(key, 0, defaultPermissionNoBlobMetadata(),
- BlobMaterialization.Implicit);
+ BlobMaterialization.Implicit, hadoopBlockSize);
}
CloudBlobWrapper blob = getBlobReference(key);
@@ -2086,7 +2097,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
if (retrieveFolderAttribute(blob)) {
LOG.debug("{} is a folder blob.", key);
return new FileMetadata(key, properties.getLastModified().getTime(),
- getPermissionStatus(blob), BlobMaterialization.Explicit);
+ getPermissionStatus(blob), BlobMaterialization.Explicit, hadoopBlockSize);
} else {
LOG.debug("{} is a normal blob.", key);
@@ -2095,7 +2106,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
key, // Always return denormalized key with metadata.
getDataLength(blob, properties),
properties.getLastModified().getTime(),
- getPermissionStatus(blob));
+ getPermissionStatus(blob), hadoopBlockSize);
}
} catch(StorageException e){
if (!NativeAzureFileSystemHelper.isFileNotFoundException(e)) {
@@ -2129,7 +2140,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
BlobProperties properties = blob.getProperties();
return new FileMetadata(key, properties.getLastModified().getTime(),
- getPermissionStatus(blob), BlobMaterialization.Implicit);
+ getPermissionStatus(blob), BlobMaterialization.Implicit, hadoopBlockSize);
}
}
@@ -2178,46 +2189,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
@Override
- public PartialListing list(String prefix, final int maxListingCount,
+ public FileMetadata[] list(String prefix, final int maxListingCount,
final int maxListingDepth) throws IOException {
- return list(prefix, maxListingCount, maxListingDepth, null);
+ return listInternal(prefix, maxListingCount, maxListingDepth);
}
- @Override
- public PartialListing list(String prefix, final int maxListingCount,
- final int maxListingDepth, String priorLastKey) throws IOException {
- return list(prefix, PATH_DELIMITER, maxListingCount, maxListingDepth,
- priorLastKey);
- }
-
- @Override
- public PartialListing listAll(String prefix, final int maxListingCount,
- final int maxListingDepth, String priorLastKey) throws IOException {
- return list(prefix, null, maxListingCount, maxListingDepth, priorLastKey);
- }
-
- /**
- * Searches the given list of {@link FileMetadata} objects for a directory
- * with the given key.
- *
- * @param list
- * The list to search.
- * @param key
- * The key to search for.
- * @return The wanted directory, or null if not found.
- */
- private static FileMetadata getFileMetadataInList(
- final Iterable list, String key) {
- for (FileMetadata current : list) {
- if (current.getKey().equals(key)) {
- return current;
- }
- }
- return null;
- }
-
- private PartialListing list(String prefix, String delimiter,
- final int maxListingCount, final int maxListingDepth, String priorLastKey)
+ private FileMetadata[] listInternal(String prefix, final int maxListingCount,
+ final int maxListingDepth)
throws IOException {
try {
checkContainer(ContainerAccessType.PureRead);
@@ -2241,7 +2219,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
objects = listRootBlobs(prefix, true, enableFlatListing);
}
- ArrayList fileMetadata = new ArrayList();
+ HashMap fileMetadata = new HashMap<>(256);
+
for (ListBlobItem blobItem : objects) {
// Check that the maximum listing count is not exhausted.
//
@@ -2261,25 +2240,37 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
FileMetadata metadata;
if (retrieveFolderAttribute(blob)) {
- metadata = new FileMetadata(blobKey,
- properties.getLastModified().getTime(),
- getPermissionStatus(blob),
- BlobMaterialization.Explicit);
+ metadata = new FileMetadata(blobKey,
+ properties.getLastModified().getTime(),
+ getPermissionStatus(blob),
+ BlobMaterialization.Explicit,
+ hadoopBlockSize);
} else {
- metadata = new FileMetadata(
- blobKey,
- getDataLength(blob, properties),
- properties.getLastModified().getTime(),
- getPermissionStatus(blob));
+ metadata = new FileMetadata(
+ blobKey,
+ getDataLength(blob, properties),
+ properties.getLastModified().getTime(),
+ getPermissionStatus(blob),
+ hadoopBlockSize);
}
+ // Add the metadata but remove duplicates. Note that the azure
+ // storage java SDK returns two types of entries: CloudBlobWrappter
+ // and CloudDirectoryWrapper. In the case where WASB generated the
+ // data, there will be an empty blob for each "directory", and we will
+ // receive a CloudBlobWrapper. If there are also files within this
+ // "directory", we will also receive a CloudDirectoryWrapper. To
+ // complicate matters, the data may not be generated by WASB, in
+ // which case we may not have an empty blob for each "directory".
+ // So, sometimes we receive both a CloudBlobWrapper and a
+ // CloudDirectoryWrapper for each directory, and sometimes we receive
+ // one or the other but not both. We remove duplicates, but
+ // prefer CloudBlobWrapper over CloudDirectoryWrapper.
+ // Furthermore, it is very unfortunate that the list results are not
+ // ordered, and it is a partial list which uses continuation. So
+ // the HashMap is the best structure to remove the duplicates, despite
+ // its potential large size.
+ fileMetadata.put(blobKey, metadata);
- // Add the metadata to the list, but remove any existing duplicate
- // entries first that we may have added by finding nested files.
- FileMetadata existing = getFileMetadataInList(fileMetadata, blobKey);
- if (existing != null) {
- fileMetadata.remove(existing);
- }
- fileMetadata.add(metadata);
} else if (blobItem instanceof CloudBlobDirectoryWrapper) {
CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem;
// Determine format of directory name depending on whether an absolute
@@ -2298,12 +2289,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// inherit the permissions of the first non-directory blob.
// Also, getting a proper value for last-modified is tricky.
FileMetadata directoryMetadata = new FileMetadata(dirKey, 0,
- defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit);
+ defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit,
+ hadoopBlockSize);
// Add the directory metadata to the list only if it's not already
- // there.
- if (getFileMetadataInList(fileMetadata, dirKey) == null) {
- fileMetadata.add(directoryMetadata);
+ // there. See earlier note, we prefer CloudBlobWrapper over
+ // CloudDirectoryWrapper because it may have additional metadata (
+ // properties and ACLs).
+ if (!fileMetadata.containsKey(dirKey)) {
+ fileMetadata.put(dirKey, directoryMetadata);
}
if (!enableFlatListing) {
@@ -2314,13 +2308,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
}
}
}
- // Note: Original code indicated that this may be a hack.
- priorLastKey = null;
- PartialListing listing = new PartialListing(priorLastKey,
- fileMetadata.toArray(new FileMetadata[] {}),
- 0 == fileMetadata.size() ? new String[] {}
- : new String[] { prefix });
- return listing;
+ return fileMetadata.values().toArray(new FileMetadata[fileMetadata.size()]);
} catch (Exception e) {
// Re-throw as an Azure storage exception.
//
@@ -2334,13 +2322,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* the sorted order of the blob names.
*
* @param aCloudBlobDirectory Azure blob directory
- * @param aFileMetadataList a list of file metadata objects for each
+ * @param metadataHashMap a map of file metadata objects for each
* non-directory blob.
* @param maxListingCount maximum length of the built up list.
*/
private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory,
- ArrayList aFileMetadataList, final int maxListingCount,
- final int maxListingDepth) throws Exception {
+ HashMap metadataHashMap, final int maxListingCount,
+ final int maxListingDepth) throws Exception {
// Push the blob directory onto the stack.
//
@@ -2371,12 +2359,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// (2) maxListingCount > 0 implies that the number of items in the
// metadata list is less than the max listing count.
while (null != blobItemIterator
- && (maxListingCount <= 0 || aFileMetadataList.size() < maxListingCount)) {
+ && (maxListingCount <= 0 || metadataHashMap.size() < maxListingCount)) {
while (blobItemIterator.hasNext()) {
// Check if the count of items on the list exhausts the maximum
// listing count.
//
- if (0 < maxListingCount && aFileMetadataList.size() >= maxListingCount) {
+ if (0 < maxListingCount && metadataHashMap.size() >= maxListingCount) {
break;
}
@@ -2399,22 +2387,34 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
metadata = new FileMetadata(blobKey,
properties.getLastModified().getTime(),
getPermissionStatus(blob),
- BlobMaterialization.Explicit);
+ BlobMaterialization.Explicit,
+ hadoopBlockSize);
} else {
metadata = new FileMetadata(
blobKey,
getDataLength(blob, properties),
properties.getLastModified().getTime(),
- getPermissionStatus(blob));
+ getPermissionStatus(blob),
+ hadoopBlockSize);
}
- // Add the directory metadata to the list only if it's not already
- // there.
- FileMetadata existing = getFileMetadataInList(aFileMetadataList, blobKey);
- if (existing != null) {
- aFileMetadataList.remove(existing);
- }
- aFileMetadataList.add(metadata);
+ // Add the metadata but remove duplicates. Note that the azure
+ // storage java SDK returns two types of entries: CloudBlobWrappter
+ // and CloudDirectoryWrapper. In the case where WASB generated the
+ // data, there will be an empty blob for each "directory", and we will
+ // receive a CloudBlobWrapper. If there are also files within this
+ // "directory", we will also receive a CloudDirectoryWrapper. To
+ // complicate matters, the data may not be generated by WASB, in
+ // which case we may not have an empty blob for each "directory".
+ // So, sometimes we receive both a CloudBlobWrapper and a
+ // CloudDirectoryWrapper for each directory, and sometimes we receive
+ // one or the other but not both. We remove duplicates, but
+ // prefer CloudBlobWrapper over CloudDirectoryWrapper.
+ // Furthermore, it is very unfortunate that the list results are not
+ // ordered, and it is a partial list which uses continuation. So
+ // the HashMap is the best structure to remove the duplicates, despite
+ // its potential large size.
+ metadataHashMap.put(blobKey, metadata);
} else if (blobItem instanceof CloudBlobDirectoryWrapper) {
CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem;
@@ -2439,7 +2439,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// absolute path is being used or not.
String dirKey = normalizeKey(directory);
- if (getFileMetadataInList(aFileMetadataList, dirKey) == null) {
+ // Add the directory metadata to the list only if it's not already
+ // there. See earlier note, we prefer CloudBlobWrapper over
+ // CloudDirectoryWrapper because it may have additional metadata (
+ // properties and ACLs).
+ if (!metadataHashMap.containsKey(dirKey)) {
+
// Reached the targeted listing depth. Return metadata for the
// directory using default permissions.
//
@@ -2450,10 +2455,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
FileMetadata directoryMetadata = new FileMetadata(dirKey,
0,
defaultPermissionNoBlobMetadata(),
- BlobMaterialization.Implicit);
+ BlobMaterialization.Implicit,
+ hadoopBlockSize);
// Add the directory metadata to the list.
- aFileMetadataList.add(directoryMetadata);
+ metadataHashMap.put(dirKey, directoryMetadata);
}
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java
index 5085a0f7dbc..cbf3ab96160 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.fs.azure;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.PermissionStatus;
/**
@@ -27,12 +29,9 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
*
*/
@InterfaceAudience.Private
-class FileMetadata {
- private final String key;
- private final long length;
- private final long lastModified;
- private final boolean isDir;
- private final PermissionStatus permissionStatus;
+class FileMetadata extends FileStatus {
+ // this is not final so that it can be cleared to save memory when not needed.
+ private String key;
private final BlobMaterialization blobMaterialization;
/**
@@ -46,16 +45,19 @@ class FileMetadata {
* The last modified date (milliseconds since January 1, 1970 UTC.)
* @param permissionStatus
* The permission for the file.
+ * @param blockSize
+ * The Hadoop file block size.
*/
public FileMetadata(String key, long length, long lastModified,
- PermissionStatus permissionStatus) {
+ PermissionStatus permissionStatus, final long blockSize) {
+ super(length, false, 1, blockSize, lastModified, 0,
+ permissionStatus.getPermission(),
+ permissionStatus.getUserName(),
+ permissionStatus.getGroupName(),
+ null);
this.key = key;
- this.length = length;
- this.lastModified = lastModified;
- this.isDir = false;
- this.permissionStatus = permissionStatus;
- this.blobMaterialization = BlobMaterialization.Explicit; // File are never
- // implicit.
+ // Files are never implicit.
+ this.blobMaterialization = BlobMaterialization.Explicit;
}
/**
@@ -70,37 +72,42 @@ class FileMetadata {
* @param blobMaterialization
* Whether this is an implicit (no real blob backing it) or explicit
* directory.
+ * @param blockSize
+ * The Hadoop file block size.
*/
public FileMetadata(String key, long lastModified,
- PermissionStatus permissionStatus, BlobMaterialization blobMaterialization) {
+ PermissionStatus permissionStatus, BlobMaterialization blobMaterialization,
+ final long blockSize) {
+ super(0, true, 1, blockSize, lastModified, 0,
+ permissionStatus.getPermission(),
+ permissionStatus.getUserName(),
+ permissionStatus.getGroupName(),
+ null);
this.key = key;
- this.isDir = true;
- this.length = 0;
- this.lastModified = lastModified;
- this.permissionStatus = permissionStatus;
this.blobMaterialization = blobMaterialization;
}
- public boolean isDir() {
- return isDir;
+ @Override
+ public Path getPath() {
+ Path p = super.getPath();
+ if (p == null) {
+ // Don't store this yet to reduce memory usage, as it will
+ // stay in the Eden Space and later we will update it
+ // with the full canonicalized path.
+ p = NativeAzureFileSystem.keyToPath(key);
+ }
+ return p;
}
+ /**
+ * Returns the Azure storage key for the file. Used internally by the framework.
+ *
+ * @return The key for the file.
+ */
public String getKey() {
return key;
}
- public long getLength() {
- return length;
- }
-
- public long getLastModified() {
- return lastModified;
- }
-
- public PermissionStatus getPermissionStatus() {
- return permissionStatus;
- }
-
/**
* Indicates whether this is an implicit directory (no real blob backing it)
* or an explicit one.
@@ -112,9 +119,7 @@ class FileMetadata {
return blobMaterialization;
}
- @Override
- public String toString() {
- return "FileMetadata[" + key + ", " + length + ", " + lastModified + ", "
- + permissionStatus + "]";
+ void removeKey() {
+ key = null;
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 52027621ef1..f8962d9b170 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -31,9 +31,7 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
-import java.util.Set;
import java.util.TimeZone;
-import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
@@ -129,20 +127,12 @@ public class NativeAzureFileSystem extends FileSystem {
this.dstKey = dstKey;
this.folderLease = lease;
this.fs = fs;
- ArrayList fileMetadataList = new ArrayList();
// List all the files in the folder.
long start = Time.monotonicNow();
- String priorLastKey = null;
- do {
- PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
- AZURE_UNBOUNDED_DEPTH, priorLastKey);
- for(FileMetadata file : listing.getFiles()) {
- fileMetadataList.add(file);
- }
- priorLastKey = listing.getPriorLastKey();
- } while (priorLastKey != null);
- fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
+ fileMetadata = fs.getStoreInterface().list(srcKey, AZURE_LIST_ALL,
+ AZURE_UNBOUNDED_DEPTH);
+
long end = Time.monotonicNow();
LOG.debug("Time taken to list {} blobs for rename operation is: {} ms", fileMetadata.length, (end - start));
@@ -669,7 +659,6 @@ public class NativeAzureFileSystem extends FileSystem {
public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class);
- static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
/**
* The time span in seconds before which we consider a temp blob to be
* dangling (not being actively uploaded to) and up for reclamation.
@@ -685,8 +674,6 @@ public class NativeAzureFileSystem extends FileSystem {
private static final int AZURE_LIST_ALL = -1;
private static final int AZURE_UNBOUNDED_DEPTH = -1;
- private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
-
/**
* The configuration property that determines which group owns files created
* in WASB.
@@ -1196,7 +1183,6 @@ public class NativeAzureFileSystem extends FileSystem {
private NativeFileSystemStore store;
private AzureNativeFileSystemStore actualStore;
private Path workingDir;
- private long blockSize = MAX_AZURE_BLOCK_SIZE;
private AzureFileSystemInstrumentation instrumentation;
private String metricsSourceName;
private boolean isClosed = false;
@@ -1361,13 +1347,10 @@ public class NativeAzureFileSystem extends FileSystem {
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
.getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
- this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
- MAX_AZURE_BLOCK_SIZE);
this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
LOG.debug("NativeAzureFileSystem. Initializing.");
- LOG.debug(" blockSize = {}",
- conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
+ LOG.debug(" blockSize = {}", store.getHadoopBlockSize());
// Initialize thread counts from user configuration
deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
@@ -1491,7 +1474,7 @@ public class NativeAzureFileSystem extends FileSystem {
}
}
- private static Path keyToPath(String key) {
+ static Path keyToPath(String key) {
if (key.equals("/")) {
return new Path("/"); // container
}
@@ -1599,7 +1582,7 @@ public class NativeAzureFileSystem extends FileSystem {
throw new FileNotFoundException(f.toString());
}
- if (meta.isDir()) {
+ if (meta.isDirectory()) {
throw new FileNotFoundException(f.toString()
+ " is a directory not a file.");
}
@@ -1815,7 +1798,7 @@ public class NativeAzureFileSystem extends FileSystem {
FileMetadata existingMetadata = store.retrieveMetadata(key);
if (existingMetadata != null) {
- if (existingMetadata.isDir()) {
+ if (existingMetadata.isDirectory()) {
throw new FileAlreadyExistsException("Cannot create file " + f
+ "; already exists as a directory.");
}
@@ -1833,7 +1816,7 @@ public class NativeAzureFileSystem extends FileSystem {
// already exists.
String parentKey = pathToKey(parentFolder);
FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
- if (parentMetadata != null && parentMetadata.isDir() &&
+ if (parentMetadata != null && parentMetadata.isDirectory() &&
parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
if (parentFolderLease != null) {
store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
@@ -1850,7 +1833,7 @@ public class NativeAzureFileSystem extends FileSystem {
firstExisting = firstExisting.getParent();
metadata = store.retrieveMetadata(pathToKey(firstExisting));
}
- mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true);
+ mkdirs(parentFolder, metadata.getPermission(), true);
}
}
@@ -1988,7 +1971,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
- if (!parentMetadata.isDir()) {
+ if (!parentMetadata.isDirectory()) {
// Invalid state: the parent path is actually a file. Throw.
throw new AzureException("File " + f + " has a parent directory "
+ parentPath + " which is also a file. Can't resolve.");
@@ -1997,7 +1980,7 @@ public class NativeAzureFileSystem extends FileSystem {
// The path exists, determine if it is a folder containing objects,
// an empty folder, or a simple file and take the appropriate actions.
- if (!metaFile.isDir()) {
+ if (!metaFile.isDirectory()) {
// The path specifies a file. We need to check the parent path
// to make sure it's a proper materialized directory before we
// delete the file. Otherwise we may get into a situation where
@@ -2114,9 +2097,9 @@ public class NativeAzureFileSystem extends FileSystem {
AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
@Override
public boolean execute(FileMetadata file) throws IOException{
- if (!deleteFile(file.getKey(), file.isDir())) {
+ if (!deleteFile(file.getKey(), file.isDirectory())) {
LOG.warn("Attempt to delete non-existent {} {}",
- file.isDir() ? "directory" : "file",
+ file.isDirectory() ? "directory" : "file",
file.getKey());
}
return true;
@@ -2138,7 +2121,7 @@ public class NativeAzureFileSystem extends FileSystem {
// Delete the current directory if all underlying contents are deleted
if (isPartialDelete || (store.retrieveMetadata(metaFile.getKey()) != null
- && !deleteFile(metaFile.getKey(), metaFile.isDir()))) {
+ && !deleteFile(metaFile.getKey(), metaFile.isDirectory()))) {
LOG.error("Failed delete directory : {}", f);
return false;
}
@@ -2191,7 +2174,7 @@ public class NativeAzureFileSystem extends FileSystem {
// The path exists, determine if it is a folder containing objects,
// an empty folder, or a simple file and take the appropriate actions.
- if (!metaFile.isDir()) {
+ if (!metaFile.isDirectory()) {
// The path specifies a file. We need to check the parent path
// to make sure it's a proper materialized directory before we
// delete the file. Otherwise we may get into a situation where
@@ -2234,7 +2217,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
- if (!parentMetadata.isDir()) {
+ if (!parentMetadata.isDirectory()) {
// Invalid state: the parent path is actually a file. Throw.
throw new AzureException("File " + f + " has a parent directory "
+ parentPath + " which is also a file. Can't resolve.");
@@ -2319,38 +2302,27 @@ public class NativeAzureFileSystem extends FileSystem {
}
}
- // List all the blobs in the current folder.
- String priorLastKey = null;
-
// Start time for list operation
long start = Time.monotonicNow();
- ArrayList fileMetadataList = new ArrayList();
+ final FileMetadata[] contents;
// List all the files in the folder with AZURE_UNBOUNDED_DEPTH depth.
- do {
- try {
- PartialListing listing = store.listAll(key, AZURE_LIST_ALL,
- AZURE_UNBOUNDED_DEPTH, priorLastKey);
- for(FileMetadata file : listing.getFiles()) {
- fileMetadataList.add(file);
- }
- priorLastKey = listing.getPriorLastKey();
- } catch (IOException e) {
- Throwable innerException = checkForAzureStorageException(e);
+ try {
+ contents = store.list(key, AZURE_LIST_ALL,
+ AZURE_UNBOUNDED_DEPTH);
+ } catch (IOException e) {
+ Throwable innerException = checkForAzureStorageException(e);
- if (innerException instanceof StorageException
- && isFileNotFoundException((StorageException) innerException)) {
- return false;
- }
-
- throw e;
+ if (innerException instanceof StorageException
+ && isFileNotFoundException((StorageException) innerException)) {
+ return false;
}
- } while (priorLastKey != null);
+
+ throw e;
+ }
long end = Time.monotonicNow();
- LOG.debug("Time taken to list {} blobs for delete operation: {} ms", fileMetadataList.size(), (end - start));
-
- final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
+ LOG.debug("Time taken to list {} blobs for delete operation: {} ms", contents.length, (end - start));
if (contents.length > 0) {
if (!recursive) {
@@ -2365,9 +2337,9 @@ public class NativeAzureFileSystem extends FileSystem {
AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
@Override
public boolean execute(FileMetadata file) throws IOException{
- if (!deleteFile(file.getKey(), file.isDir())) {
+ if (!deleteFile(file.getKey(), file.isDirectory())) {
LOG.warn("Attempt to delete non-existent {} {}",
- file.isDir() ? "directory" : "file",
+ file.isDirectory() ? "directory" : "file",
file.getKey());
}
return true;
@@ -2384,7 +2356,7 @@ public class NativeAzureFileSystem extends FileSystem {
// Delete the current directory
if (store.retrieveMetadata(metaFile.getKey()) != null
- && !deleteFile(metaFile.getKey(), metaFile.isDir())) {
+ && !deleteFile(metaFile.getKey(), metaFile.isDirectory())) {
LOG.error("Failed delete directory : {}", f);
return false;
}
@@ -2456,13 +2428,13 @@ public class NativeAzureFileSystem extends FileSystem {
boolean isPartialDelete = false;
- Path pathToDelete = makeAbsolute(keyToPath(folderToDelete.getKey()));
+ Path pathToDelete = makeAbsolute(folderToDelete.getPath());
foldersToProcess.push(folderToDelete);
while (!foldersToProcess.empty()) {
FileMetadata currentFolder = foldersToProcess.pop();
- Path currentPath = makeAbsolute(keyToPath(currentFolder.getKey()));
+ Path currentPath = makeAbsolute(currentFolder.getPath());
boolean canDeleteChildren = true;
// If authorization is enabled, check for 'write' permission on current folder
@@ -2478,8 +2450,8 @@ public class NativeAzureFileSystem extends FileSystem {
if (canDeleteChildren) {
// get immediate children list
- ArrayList fileMetadataList = getChildrenMetadata(currentFolder.getKey(),
- maxListingDepth);
+ FileMetadata[] fileMetadataList = store.list(currentFolder.getKey(),
+ AZURE_LIST_ALL, maxListingDepth);
// Process children of currentFolder and add them to list of contents
// that can be deleted. We Perform stickybit check on every file and
@@ -2490,12 +2462,12 @@ public class NativeAzureFileSystem extends FileSystem {
// This file/folder cannot be deleted and neither can the parent paths be deleted.
// Remove parent paths from list of contents that can be deleted.
canDeleteChildren = false;
- Path filePath = makeAbsolute(keyToPath(childItem.getKey()));
+ Path filePath = makeAbsolute(childItem.getPath());
LOG.error("User does not have permissions to delete {}. "
+ "Parent directory has sticky bit set.", filePath);
} else {
// push the child directories to the stack to process their contents
- if (childItem.isDir()) {
+ if (childItem.isDirectory()) {
foldersToProcess.push(childItem);
}
// Add items to list of contents that can be deleted.
@@ -2540,23 +2512,6 @@ public class NativeAzureFileSystem extends FileSystem {
return isPartialDelete;
}
- private ArrayList getChildrenMetadata(String key, int maxListingDepth)
- throws IOException {
-
- String priorLastKey = null;
- ArrayList fileMetadataList = new ArrayList();
- do {
- PartialListing listing = store.listAll(key, AZURE_LIST_ALL,
- maxListingDepth, priorLastKey);
- for (FileMetadata file : listing.getFiles()) {
- fileMetadataList.add(file);
- }
- priorLastKey = listing.getPriorLastKey();
- } while (priorLastKey != null);
-
- return fileMetadataList;
- }
-
private boolean isStickyBitCheckViolated(FileMetadata metaData,
FileMetadata parentMetadata, boolean throwOnException) throws IOException {
try {
@@ -2602,13 +2557,13 @@ public class NativeAzureFileSystem extends FileSystem {
}
// stickybit is not set on parent and hence cannot be violated
- if (!parentMetadata.getPermissionStatus().getPermission().getStickyBit()) {
+ if (!parentMetadata.getPermission().getStickyBit()) {
return false;
}
String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
- String parentDirectoryOwner = parentMetadata.getPermissionStatus().getUserName();
- String currentFileOwner = metaData.getPermissionStatus().getUserName();
+ String parentDirectoryOwner = parentMetadata.getOwner();
+ String currentFileOwner = metaData.getOwner();
// Files/Folders with no owner set will not pass stickybit check
if ((parentDirectoryOwner.equalsIgnoreCase(currentUser))
@@ -2687,7 +2642,15 @@ public class NativeAzureFileSystem extends FileSystem {
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
if (key.length() == 0) { // root always exists
- return newDirectory(null, absolutePath);
+ return new FileStatus(
+ 0,
+ true,
+ 1,
+ store.getHadoopBlockSize(),
+ 0,
+ 0,
+ FsPermission.getDefault(), "", "",
+ absolutePath.makeQualified(getUri(), getWorkingDirectory()));
}
// The path is either a folder or a file. Retrieve metadata to
@@ -2709,7 +2672,7 @@ public class NativeAzureFileSystem extends FileSystem {
}
if (meta != null) {
- if (meta.isDir()) {
+ if (meta.isDirectory()) {
// The path is a folder with files in it.
//
@@ -2723,14 +2686,14 @@ public class NativeAzureFileSystem extends FileSystem {
}
// Return reference to the directory object.
- return newDirectory(meta, absolutePath);
+ return updateFileStatusPath(meta, absolutePath);
}
// The path is a file.
LOG.debug("Found the path: {} as a file.", f.toString());
// Return with reference to a file object.
- return newFile(meta, absolutePath);
+ return updateFileStatusPath(meta, absolutePath);
}
// File not found. Throw exception no such file or directory.
@@ -2787,7 +2750,7 @@ public class NativeAzureFileSystem extends FileSystem {
performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "liststatus", absolutePath);
String key = pathToKey(absolutePath);
- Set status = new TreeSet();
+
FileMetadata meta = null;
try {
meta = store.retrieveMetadata(key);
@@ -2804,101 +2767,93 @@ public class NativeAzureFileSystem extends FileSystem {
throw ex;
}
- if (meta != null) {
- if (!meta.isDir()) {
+ if (meta == null) {
+ // There is no metadata found for the path.
+ LOG.debug("Did not find any metadata for path: {}", key);
+ throw new FileNotFoundException(f + " is not found");
+ }
- LOG.debug("Found path as a file");
+ if (!meta.isDirectory()) {
+ LOG.debug("Found path as a file");
+ return new FileStatus[] { updateFileStatusPath(meta, absolutePath) };
+ }
- return new FileStatus[] { newFile(meta, absolutePath) };
- }
+ FileMetadata[] listing;
- String partialKey = null;
- PartialListing listing = null;
+ listing = listWithErrorHandling(key, AZURE_LIST_ALL, 1);
- try {
- listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
- } catch (IOException ex) {
+ // NOTE: We don't check for Null condition as the Store API should return
+ // an empty list if there are not listing.
- Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+ // For any -RenamePending.json files in the listing,
+ // push the rename forward.
+ boolean renamed = conditionalRedoFolderRenames(listing);
- if (innerException instanceof StorageException
- && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+ // If any renames were redone, get another listing,
+ // since the current one may have changed due to the redo.
+ if (renamed) {
+ listing = listWithErrorHandling(key, AZURE_LIST_ALL, 1);
+ }
- throw new FileNotFoundException(String.format("%s is not found", key));
- }
+ // We only need to check for AZURE_TEMP_FOLDER if the key is the root,
+ // and if it is not the root we also know the exact size of the array
+ // of FileStatus.
- throw ex;
- }
- // NOTE: We don't check for Null condition as the Store API should return
- // an empty list if there are not listing.
+ FileMetadata[] result = null;
- // For any -RenamePending.json files in the listing,
- // push the rename forward.
- boolean renamed = conditionalRedoFolderRenames(listing);
+ if (key.equals("/")) {
+ ArrayList status = new ArrayList<>(listing.length);
- // If any renames were redone, get another listing,
- // since the current one may have changed due to the redo.
- if (renamed) {
- listing = null;
- try {
- listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
- } catch (IOException ex) {
- Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
-
- if (innerException instanceof StorageException
- && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
-
- throw new FileNotFoundException(String.format("%s is not found", key));
- }
-
- throw ex;
- }
- }
-
- // NOTE: We don't check for Null condition as the Store API should return
- // and empty list if there are not listing.
-
- for (FileMetadata fileMetadata : listing.getFiles()) {
- Path subpath = keyToPath(fileMetadata.getKey());
-
- // Test whether the metadata represents a file or directory and
- // add the appropriate metadata object.
- //
- // Note: There was a very old bug here where directories were added
- // to the status set as files flattening out recursive listings
- // using "-lsr" down the file system hierarchy.
- if (fileMetadata.isDir()) {
+ for (FileMetadata fileMetadata : listing) {
+ if (fileMetadata.isDirectory()) {
// Make sure we hide the temp upload folder
if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
// Don't expose that.
continue;
}
- status.add(newDirectory(fileMetadata, subpath));
+ status.add(updateFileStatusPath(fileMetadata, fileMetadata.getPath()));
} else {
- status.add(newFile(fileMetadata, subpath));
+ status.add(updateFileStatusPath(fileMetadata, fileMetadata.getPath()));
}
}
-
- LOG.debug("Found path as a directory with {}"
- + " files in it.", status.size());
-
+ result = status.toArray(new FileMetadata[0]);
} else {
- // There is no metadata found for the path.
- LOG.debug("Did not find any metadata for path: {}", key);
-
- throw new FileNotFoundException(f + " is not found");
+ for (int i = 0; i < listing.length; i++) {
+ FileMetadata fileMetadata = listing[i];
+ listing[i] = updateFileStatusPath(fileMetadata, fileMetadata.getPath());
+ }
+ result = listing;
}
- return status.toArray(new FileStatus[0]);
+ LOG.debug("Found path as a directory with {}"
+ + " files in it.", result.length);
+
+ return result;
+ }
+
+ private FileMetadata[] listWithErrorHandling(String prefix, final int maxListingCount,
+ final int maxListingDepth) throws IOException {
+ try {
+ return store.list(prefix, maxListingCount, maxListingDepth);
+ } catch (IOException ex) {
+ Throwable innerException
+ = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+ if (innerException instanceof StorageException
+ && NativeAzureFileSystemHelper.isFileNotFoundException(
+ (StorageException) innerException)) {
+ throw new FileNotFoundException(String.format("%s is not found", prefix));
+ }
+ throw ex;
+ }
}
// Redo any folder renames needed if there are rename pending files in the
// directory listing. Return true if one or more redo operations were done.
- private boolean conditionalRedoFolderRenames(PartialListing listing)
+ private boolean conditionalRedoFolderRenames(FileMetadata[] listing)
throws IllegalArgumentException, IOException {
boolean renamed = false;
- for (FileMetadata fileMetadata : listing.getFiles()) {
- Path subpath = keyToPath(fileMetadata.getKey());
+ for (FileMetadata fileMetadata : listing) {
+ Path subpath = fileMetadata.getPath();
if (isRenamePendingFile(subpath)) {
FolderRenamePending pending =
new FolderRenamePending(subpath, this);
@@ -2914,32 +2869,11 @@ public class NativeAzureFileSystem extends FileSystem {
return path.toString().endsWith(FolderRenamePending.SUFFIX);
}
- private FileStatus newFile(FileMetadata meta, Path path) {
- return new FileStatus (
- meta.getLength(),
- false,
- 1,
- blockSize,
- meta.getLastModified(),
- 0,
- meta.getPermissionStatus().getPermission(),
- meta.getPermissionStatus().getUserName(),
- meta.getPermissionStatus().getGroupName(),
- path.makeQualified(getUri(), getWorkingDirectory()));
- }
-
- private FileStatus newDirectory(FileMetadata meta, Path path) {
- return new FileStatus (
- 0,
- true,
- 1,
- blockSize,
- meta == null ? 0 : meta.getLastModified(),
- 0,
- meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
- meta == null ? "" : meta.getPermissionStatus().getUserName(),
- meta == null ? "" : meta.getPermissionStatus().getGroupName(),
- path.makeQualified(getUri(), getWorkingDirectory()));
+ private FileMetadata updateFileStatusPath(FileMetadata meta, Path path) {
+ meta.setPath(path.makeQualified(getUri(), getWorkingDirectory()));
+ // reduce memory use by setting the internal-only key to null
+ meta.removeKey();
+ return meta;
}
private static enum UMaskApplyMode {
@@ -3000,8 +2934,8 @@ public class NativeAzureFileSystem extends FileSystem {
String currentKey = pathToKey(current);
FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
- if (currentMetadata != null && currentMetadata.isDir()) {
- Path ancestor = keyToPath(currentMetadata.getKey());
+ if (currentMetadata != null && currentMetadata.isDirectory()) {
+ Path ancestor = currentMetadata.getPath();
LOG.debug("Found ancestor {}, for path: {}", ancestor.toString(), f.toString());
return ancestor;
}
@@ -3052,7 +2986,7 @@ public class NativeAzureFileSystem extends FileSystem {
current = parent, parent = current.getParent()) {
String currentKey = pathToKey(current);
FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
- if (currentMetadata != null && !currentMetadata.isDir()) {
+ if (currentMetadata != null && !currentMetadata.isDirectory()) {
throw new FileAlreadyExistsException("Cannot create directory " + f + " because "
+ current + " is an existing file.");
} else if (currentMetadata == null) {
@@ -3099,7 +3033,7 @@ public class NativeAzureFileSystem extends FileSystem {
if (meta == null) {
throw new FileNotFoundException(f.toString());
}
- if (meta.isDir()) {
+ if (meta.isDirectory()) {
throw new FileNotFoundException(f.toString()
+ " is a directory not a file.");
}
@@ -3120,7 +3054,7 @@ public class NativeAzureFileSystem extends FileSystem {
}
return new FSDataInputStream(new BufferedFSInputStream(
- new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize));
+ new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize));
}
@Override
@@ -3196,7 +3130,7 @@ public class NativeAzureFileSystem extends FileSystem {
}
}
- if (dstMetadata != null && dstMetadata.isDir()) {
+ if (dstMetadata != null && dstMetadata.isDirectory()) {
// It's an existing directory.
performAuthCheck(absoluteDstPath, WasbAuthorizationOperations.WRITE, "rename",
absoluteDstPath);
@@ -3232,7 +3166,7 @@ public class NativeAzureFileSystem extends FileSystem {
LOG.debug("Parent of the destination {}"
+ " doesn't exist, failing the rename.", dst);
return false;
- } else if (!parentOfDestMetadata.isDir()) {
+ } else if (!parentOfDestMetadata.isDirectory()) {
LOG.debug("Parent of the destination {}"
+ " is a file, failing the rename.", dst);
return false;
@@ -3261,7 +3195,7 @@ public class NativeAzureFileSystem extends FileSystem {
// Source doesn't exist
LOG.debug("Source {} doesn't exist, failing the rename.", src);
return false;
- } else if (!srcMetadata.isDir()) {
+ } else if (!srcMetadata.isDirectory()) {
LOG.debug("Source {} found as a file, renaming.", src);
try {
// HADOOP-15086 - file rename must ensure that the destination does
@@ -3335,7 +3269,7 @@ public class NativeAzureFileSystem extends FileSystem {
// single file. In this case, the parent folder no longer exists if the
// file is renamed; so we can safely ignore the null pointer case.
if (parentMetadata != null) {
- if (parentMetadata.isDir()
+ if (parentMetadata.isDirectory()
&& parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
store.storeEmptyFolder(parentKey,
createPermissionStatus(FsPermission.getDefault()));
@@ -3511,7 +3445,7 @@ public class NativeAzureFileSystem extends FileSystem {
&& !isAllowedUser(currentUgi.getShortUserName(), daemonUsers)) {
//Check if the user is the owner of the file.
- String owner = metadata.getPermissionStatus().getUserName();
+ String owner = metadata.getOwner();
if (!currentUgi.getShortUserName().equals(owner)) {
throw new WasbAuthorizationException(
String.format("user '%s' does not have the privilege to "
@@ -3522,16 +3456,16 @@ public class NativeAzureFileSystem extends FileSystem {
}
permission = applyUMask(permission,
- metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
+ metadata.isDirectory() ? UMaskApplyMode.ChangeExistingDirectory
: UMaskApplyMode.ChangeExistingFile);
if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
// It's an implicit folder, need to materialize it.
store.storeEmptyFolder(key, createPermissionStatus(permission));
- } else if (!metadata.getPermissionStatus().getPermission().
+ } else if (!metadata.getPermission().
equals(permission)) {
store.changePermissionStatus(key, new PermissionStatus(
- metadata.getPermissionStatus().getUserName(),
- metadata.getPermissionStatus().getGroupName(),
+ metadata.getOwner(),
+ metadata.getGroup(),
permission));
}
}
@@ -3579,10 +3513,10 @@ public class NativeAzureFileSystem extends FileSystem {
PermissionStatus newPermissionStatus = new PermissionStatus(
username == null ?
- metadata.getPermissionStatus().getUserName() : username,
+ metadata.getOwner() : username,
groupname == null ?
- metadata.getPermissionStatus().getGroupName() : groupname,
- metadata.getPermissionStatus().getPermission());
+ metadata.getGroup() : groupname,
+ metadata.getPermission());
if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
// It's an implicit folder, need to materialize it.
store.storeEmptyFolder(key, newPermissionStatus);
@@ -3778,30 +3712,26 @@ public class NativeAzureFileSystem extends FileSystem {
AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
// Go over all the blobs under the given root and look for blobs to
// recover.
- String priorLastKey = null;
- do {
- PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
- AZURE_UNBOUNDED_DEPTH, priorLastKey);
+ FileMetadata[] listing = store.list(pathToKey(root), AZURE_LIST_ALL,
+ AZURE_UNBOUNDED_DEPTH);
- for (FileMetadata file : listing.getFiles()) {
- if (!file.isDir()) { // We don't recover directory blobs
- // See if this blob has a link in it (meaning it's a place-holder
- // blob for when the upload to the temp blob is complete).
- String link = store.getLinkInFileMetadata(file.getKey());
- if (link != null) {
- // It has a link, see if the temp blob it is pointing to is
- // existent and old enough to be considered dangling.
- FileMetadata linkMetadata = store.retrieveMetadata(link);
- if (linkMetadata != null
- && linkMetadata.getLastModified() >= cutoffForDangling) {
- // Found one!
- handler.handleFile(file, linkMetadata);
- }
+ for (FileMetadata file : listing) {
+ if (!file.isDirectory()) { // We don't recover directory blobs
+ // See if this blob has a link in it (meaning it's a place-holder
+ // blob for when the upload to the temp blob is complete).
+ String link = store.getLinkInFileMetadata(file.getKey());
+ if (link != null) {
+ // It has a link, see if the temp blob it is pointing to is
+ // existent and old enough to be considered dangling.
+ FileMetadata linkMetadata = store.retrieveMetadata(link);
+ if (linkMetadata != null
+ && linkMetadata.getModificationTime() >= cutoffForDangling) {
+ // Found one!
+ handler.handleFile(file, linkMetadata);
}
}
}
- priorLastKey = listing.getPriorLastKey();
- } while (priorLastKey != null);
+ }
}
/**
@@ -3888,7 +3818,7 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key);
if (meta != null) {
- owner = meta.getPermissionStatus().getUserName();
+ owner = meta.getOwner();
LOG.debug("Retrieved '{}' as owner for path - {}", owner, absolutePath);
} else {
// meta will be null if file/folder doen not exist
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index b67ab1b297b..36e3819c32b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -58,20 +58,21 @@ interface NativeFileSystemStore {
boolean isAtomicRenameKey(String key);
+ /**
+ * Returns the file block size. This is a fake value used for integration
+ * of the Azure store with Hadoop.
+ * @return The file block size.
+ */
+ long getHadoopBlockSize();
+
void storeEmptyLinkFile(String key, String tempBlobKey,
PermissionStatus permissionStatus) throws AzureException;
String getLinkInFileMetadata(String key) throws AzureException;
- PartialListing list(String prefix, final int maxListingCount,
+ FileMetadata[] list(String prefix, final int maxListingCount,
final int maxListingDepth) throws IOException;
- PartialListing list(String prefix, final int maxListingCount,
- final int maxListingDepth, String priorLastKey) throws IOException;
-
- PartialListing listAll(String prefix, final int maxListingCount,
- final int maxListingDepth, String priorLastKey) throws IOException;
-
void changePermissionStatus(String key, PermissionStatus newPermission)
throws AzureException;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
deleted file mode 100644
index 4a80d2ef8f0..00000000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azure;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- *
- * Holds information on a directory listing for a {@link NativeFileSystemStore}.
- * This includes the {@link FileMetadata files} and directories (their names)
- * contained in a directory.
- *
- *
- * This listing may be returned in chunks, so a priorLastKey
is
- * provided so that the next chunk may be requested.
- *
- *
- * @see NativeFileSystemStore#list(String, int, String)
- */
-@InterfaceAudience.Private
-class PartialListing {
-
- private final String priorLastKey;
- private final FileMetadata[] files;
- private final String[] commonPrefixes;
-
- public PartialListing(String priorLastKey, FileMetadata[] files,
- String[] commonPrefixes) {
- this.priorLastKey = priorLastKey;
- this.files = files;
- this.commonPrefixes = commonPrefixes;
- }
-
- public FileMetadata[] getFiles() {
- return files;
- }
-
- public String[] getCommonPrefixes() {
- return commonPrefixes;
- }
-
- public String getPriorLastKey() {
- return priorLastKey;
- }
-}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestListPerformance.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestListPerformance.java
new file mode 100644
index 00000000000..e7a3fa88511
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestListPerformance.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.junit.Assume;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+/**
+ * Test list performance.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+
+public class ITestListPerformance extends AbstractAzureScaleTest {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ITestListPerformance.class);
+
+ private static final Path TEST_DIR_PATH = new Path(
+ "DirectoryWithManyFiles");
+
+ private static final int NUMBER_OF_THREADS = 10;
+ private static final int NUMBER_OF_FILES_PER_THREAD = 1000;
+
+ private int threads;
+
+ private int filesPerThread;
+
+ private int expectedFileCount;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = getConfiguration();
+ // fail fast
+ threads = AzureTestUtils.getTestPropertyInt(conf,
+ "fs.azure.scale.test.list.performance.threads", NUMBER_OF_THREADS);
+ filesPerThread = AzureTestUtils.getTestPropertyInt(conf,
+ "fs.azure.scale.test.list.performance.files", NUMBER_OF_FILES_PER_THREAD);
+ expectedFileCount = threads * filesPerThread;
+ LOG.info("Thread = {}, Files per Thread = {}, expected files = {}",
+ threads, filesPerThread, expectedFileCount);
+ conf.set("fs.azure.io.retry.max.retries", "1");
+ conf.set("fs.azure.delete.threads", "16");
+ createTestAccount();
+ }
+
+ @Override
+ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+ return AzureBlobStorageTestAccount.create(
+ "itestlistperformance",
+ EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+ null,
+ true);
+ }
+
+ @Test
+ public void test_0101_CreateDirectoryWithFiles() throws Exception {
+ Assume.assumeFalse("Test path exists; skipping", fs.exists(TEST_DIR_PATH));
+
+ ExecutorService executorService = Executors.newFixedThreadPool(threads);
+ CloudBlobContainer container = testAccount.getRealContainer();
+
+ final String basePath = (fs.getWorkingDirectory().toUri().getPath() + "/" + TEST_DIR_PATH + "/").substring(1);
+
+ ArrayList> tasks = new ArrayList<>(threads);
+ fs.mkdirs(TEST_DIR_PATH);
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ for (int i = 0; i < threads; i++) {
+ tasks.add(
+ new Callable() {
+ public Integer call() {
+ int written = 0;
+ for (int j = 0; j < filesPerThread; j++) {
+ String blobName = basePath + UUID.randomUUID().toString();
+ try {
+ CloudBlockBlob blob = container.getBlockBlobReference(
+ blobName);
+ blob.uploadText("");
+ written ++;
+ } catch (Exception e) {
+ LOG.error("Filed to write {}", blobName, e);
+ break;
+ }
+ }
+ LOG.info("Thread completed with {} files written", written);
+ return written;
+ }
+ }
+ );
+ }
+
+ List> futures = executorService.invokeAll(tasks,
+ getTestTimeoutMillis(), TimeUnit.MILLISECONDS);
+ long elapsedMs = timer.elapsedTimeMs();
+ LOG.info("time to create files: {} millis", elapsedMs);
+
+ for (Future future : futures) {
+ assertTrue("Future timed out", future.isDone());
+ assertEquals("Future did not write all files timed out",
+ filesPerThread, future.get().intValue());
+ }
+ }
+
+ @Test
+ public void test_0200_ListStatusPerformance() throws Exception {
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ FileStatus[] fileList = fs.listStatus(TEST_DIR_PATH);
+ long elapsedMs = timer.elapsedTimeMs();
+ LOG.info(String.format(
+ "files=%1$d, elapsedMs=%2$d",
+ fileList.length,
+ elapsedMs));
+ Map foundInList =new HashMap<>(expectedFileCount);
+
+ for (FileStatus fileStatus : fileList) {
+ foundInList.put(fileStatus.getPath(), fileStatus);
+ LOG.info("{}: {}", fileStatus.getPath(),
+ fileStatus.isDirectory() ? "dir" : "file");
+ }
+ assertEquals("Mismatch between expected files and actual",
+ expectedFileCount, fileList.length);
+
+
+ // now do a listFiles() recursive
+ ContractTestUtils.NanoTimer initialStatusCallTimer
+ = new ContractTestUtils.NanoTimer();
+ RemoteIterator listing
+ = fs.listFiles(TEST_DIR_PATH, true);
+ long initialListTime = initialStatusCallTimer.elapsedTimeMs();
+ timer = new ContractTestUtils.NanoTimer();
+ while (listing.hasNext()) {
+ FileStatus fileStatus = listing.next();
+ Path path = fileStatus.getPath();
+ FileStatus removed = foundInList.remove(path);
+ assertNotNull("Did not find " + path + "{} in the previous listing",
+ removed);
+ }
+ elapsedMs = timer.elapsedTimeMs();
+ LOG.info("time for listFiles() initial call: {} millis;"
+ + " time to iterate: {} millis", initialListTime, elapsedMs);
+ assertEquals("Not all files from listStatus() were found in listFiles()",
+ 0, foundInList.size());
+
+ }
+
+ @Test
+ public void test_0300_BulkDeletePerformance() throws Exception {
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ fs.delete(TEST_DIR_PATH,true);
+ long elapsedMs = timer.elapsedTimeMs();
+ LOG.info("time for delete(): {} millis; {} nanoS per file",
+ elapsedMs, timer.nanosPerOperation(expectedFileCount));
+ }
+}