HADOOP-15547/ WASB: improve listStatus performance.

Contributed by Thomas Marquardt.

(cherry picked from commit 749fff577e)
This commit is contained in:
Steve Loughran 2018-07-19 12:31:19 -07:00
parent 5836e0a46b
commit 45d9568aaa
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
8 changed files with 514 additions and 415 deletions

View File

@ -47,4 +47,14 @@
<Bug pattern="WMI_WRONG_MAP_ITERATOR" /> <Bug pattern="WMI_WRONG_MAP_ITERATOR" />
<Priority value="2" /> <Priority value="2" />
</Match> </Match>
<!-- FileMetadata is used internally for storing metadata but also
subclasses FileStatus to reduce allocations when listing a large number
of files. When it is returned to an external caller as a FileStatus, the
extra metadata is no longer useful and we want the equals and hashCode
methods of FileStatus to be used. -->
<Match>
<Class name="org.apache.hadoop.fs.azure.FileMetadata" />
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS" />
</Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -43,6 +43,8 @@
<fs.azure.scale.test.huge.partitionsize>unset</fs.azure.scale.test.huge.partitionsize> <fs.azure.scale.test.huge.partitionsize>unset</fs.azure.scale.test.huge.partitionsize>
<!-- Timeout in seconds for scale tests.--> <!-- Timeout in seconds for scale tests.-->
<fs.azure.scale.test.timeout>7200</fs.azure.scale.test.timeout> <fs.azure.scale.test.timeout>7200</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>10</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>1000</fs.azure.scale.test.list.performance.files>
</properties> </properties>
<build> <build>
@ -298,6 +300,8 @@
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize> <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize> <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout> <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
</systemPropertyVariables> </systemPropertyVariables>
<includes> <includes>
<include>**/Test*.java</include> <include>**/Test*.java</include>
@ -326,6 +330,8 @@
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize> <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize> <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout> <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
</systemPropertyVariables> </systemPropertyVariables>
<includes> <includes>
<include>**/TestRollingWindowAverage*.java</include> <include>**/TestRollingWindowAverage*.java</include>
@ -367,6 +373,8 @@
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize> <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize> <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout> <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
</systemPropertyVariables> </systemPropertyVariables>
<!-- Some tests cannot run in parallel. Tests that cover --> <!-- Some tests cannot run in parallel. Tests that cover -->
<!-- access to the root directory must run in isolation --> <!-- access to the root directory must run in isolation -->
@ -412,6 +420,8 @@
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize> <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize> <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout> <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
</systemPropertyVariables> </systemPropertyVariables>
<includes> <includes>
<include>**/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java</include> <include>**/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java</include>
@ -454,6 +464,8 @@
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled> <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize> <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout> <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
</systemPropertyVariables> </systemPropertyVariables>
<forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds> <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
<trimStackTrace>false</trimStackTrace> <trimStackTrace>false</trimStackTrace>

View File

@ -30,7 +30,6 @@ import java.net.URISyntaxException;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.security.InvalidKeyException; import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
@ -128,6 +127,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// computed as min(2*cpu,8) // computed as min(2*cpu,8)
private static final String KEY_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out"; private static final String KEY_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
private static final String HADOOP_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
private static final String KEY_STREAM_MIN_READ_SIZE = "fs.azure.read.request.size"; private static final String KEY_STREAM_MIN_READ_SIZE = "fs.azure.read.request.size";
private static final String KEY_STORAGE_CONNECTION_TIMEOUT = "fs.azure.storage.timeout"; private static final String KEY_STORAGE_CONNECTION_TIMEOUT = "fs.azure.storage.timeout";
private static final String KEY_WRITE_BLOCK_SIZE = "fs.azure.write.request.size"; private static final String KEY_WRITE_BLOCK_SIZE = "fs.azure.write.request.size";
@ -252,6 +252,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// Default block sizes // Default block sizes
public static final int DEFAULT_DOWNLOAD_BLOCK_SIZE = 4 * 1024 * 1024; public static final int DEFAULT_DOWNLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024; public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
public static final long DEFAULT_HADOOP_BLOCK_SIZE = 512 * 1024 * 1024L;
private static final int DEFAULT_INPUT_STREAM_VERSION = 2; private static final int DEFAULT_INPUT_STREAM_VERSION = 2;
@ -313,6 +314,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private boolean tolerateOobAppends = DEFAULT_READ_TOLERATE_CONCURRENT_APPEND; private boolean tolerateOobAppends = DEFAULT_READ_TOLERATE_CONCURRENT_APPEND;
private long hadoopBlockSize = DEFAULT_HADOOP_BLOCK_SIZE;
private int downloadBlockSizeBytes = DEFAULT_DOWNLOAD_BLOCK_SIZE; private int downloadBlockSizeBytes = DEFAULT_DOWNLOAD_BLOCK_SIZE;
private int uploadBlockSizeBytes = DEFAULT_UPLOAD_BLOCK_SIZE; private int uploadBlockSizeBytes = DEFAULT_UPLOAD_BLOCK_SIZE;
private int inputStreamVersion = DEFAULT_INPUT_STREAM_VERSION; private int inputStreamVersion = DEFAULT_INPUT_STREAM_VERSION;
@ -740,6 +742,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
KEY_STREAM_MIN_READ_SIZE, DEFAULT_DOWNLOAD_BLOCK_SIZE); KEY_STREAM_MIN_READ_SIZE, DEFAULT_DOWNLOAD_BLOCK_SIZE);
this.uploadBlockSizeBytes = sessionConfiguration.getInt( this.uploadBlockSizeBytes = sessionConfiguration.getInt(
KEY_WRITE_BLOCK_SIZE, DEFAULT_UPLOAD_BLOCK_SIZE); KEY_WRITE_BLOCK_SIZE, DEFAULT_UPLOAD_BLOCK_SIZE);
this.hadoopBlockSize = sessionConfiguration.getLong(
HADOOP_BLOCK_SIZE_PROPERTY_NAME, DEFAULT_HADOOP_BLOCK_SIZE);
this.inputStreamVersion = sessionConfiguration.getInt( this.inputStreamVersion = sessionConfiguration.getInt(
KEY_INPUT_STREAM_VERSION, DEFAULT_INPUT_STREAM_VERSION); KEY_INPUT_STREAM_VERSION, DEFAULT_INPUT_STREAM_VERSION);
@ -1234,7 +1238,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
return false; return false;
} }
/**
* Returns the file block size. This is a fake value used for integration
* of the Azure store with Hadoop.
*/
@Override
public long getHadoopBlockSize() {
return hadoopBlockSize;
}
/** /**
* This should be called from any method that does any modifications to the * This should be called from any method that does any modifications to the
@ -2066,7 +2077,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// The key refers to root directory of container. // The key refers to root directory of container.
// Set the modification time for root to zero. // Set the modification time for root to zero.
return new FileMetadata(key, 0, defaultPermissionNoBlobMetadata(), return new FileMetadata(key, 0, defaultPermissionNoBlobMetadata(),
BlobMaterialization.Implicit); BlobMaterialization.Implicit, hadoopBlockSize);
} }
CloudBlobWrapper blob = getBlobReference(key); CloudBlobWrapper blob = getBlobReference(key);
@ -2086,7 +2097,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
if (retrieveFolderAttribute(blob)) { if (retrieveFolderAttribute(blob)) {
LOG.debug("{} is a folder blob.", key); LOG.debug("{} is a folder blob.", key);
return new FileMetadata(key, properties.getLastModified().getTime(), return new FileMetadata(key, properties.getLastModified().getTime(),
getPermissionStatus(blob), BlobMaterialization.Explicit); getPermissionStatus(blob), BlobMaterialization.Explicit, hadoopBlockSize);
} else { } else {
LOG.debug("{} is a normal blob.", key); LOG.debug("{} is a normal blob.", key);
@ -2095,7 +2106,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
key, // Always return denormalized key with metadata. key, // Always return denormalized key with metadata.
getDataLength(blob, properties), getDataLength(blob, properties),
properties.getLastModified().getTime(), properties.getLastModified().getTime(),
getPermissionStatus(blob)); getPermissionStatus(blob), hadoopBlockSize);
} }
} catch(StorageException e){ } catch(StorageException e){
if (!NativeAzureFileSystemHelper.isFileNotFoundException(e)) { if (!NativeAzureFileSystemHelper.isFileNotFoundException(e)) {
@ -2129,7 +2140,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
BlobProperties properties = blob.getProperties(); BlobProperties properties = blob.getProperties();
return new FileMetadata(key, properties.getLastModified().getTime(), return new FileMetadata(key, properties.getLastModified().getTime(),
getPermissionStatus(blob), BlobMaterialization.Implicit); getPermissionStatus(blob), BlobMaterialization.Implicit, hadoopBlockSize);
} }
} }
@ -2178,46 +2189,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
} }
@Override @Override
public PartialListing list(String prefix, final int maxListingCount, public FileMetadata[] list(String prefix, final int maxListingCount,
final int maxListingDepth) throws IOException { final int maxListingDepth) throws IOException {
return list(prefix, maxListingCount, maxListingDepth, null); return listInternal(prefix, maxListingCount, maxListingDepth);
} }
@Override private FileMetadata[] listInternal(String prefix, final int maxListingCount,
public PartialListing list(String prefix, final int maxListingCount, final int maxListingDepth)
final int maxListingDepth, String priorLastKey) throws IOException {
return list(prefix, PATH_DELIMITER, maxListingCount, maxListingDepth,
priorLastKey);
}
@Override
public PartialListing listAll(String prefix, final int maxListingCount,
final int maxListingDepth, String priorLastKey) throws IOException {
return list(prefix, null, maxListingCount, maxListingDepth, priorLastKey);
}
/**
* Searches the given list of {@link FileMetadata} objects for a directory
* with the given key.
*
* @param list
* The list to search.
* @param key
* The key to search for.
* @return The wanted directory, or null if not found.
*/
private static FileMetadata getFileMetadataInList(
final Iterable<FileMetadata> list, String key) {
for (FileMetadata current : list) {
if (current.getKey().equals(key)) {
return current;
}
}
return null;
}
private PartialListing list(String prefix, String delimiter,
final int maxListingCount, final int maxListingDepth, String priorLastKey)
throws IOException { throws IOException {
try { try {
checkContainer(ContainerAccessType.PureRead); checkContainer(ContainerAccessType.PureRead);
@ -2241,7 +2219,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
objects = listRootBlobs(prefix, true, enableFlatListing); objects = listRootBlobs(prefix, true, enableFlatListing);
} }
ArrayList<FileMetadata> fileMetadata = new ArrayList<FileMetadata>(); HashMap<String, FileMetadata> fileMetadata = new HashMap<>(256);
for (ListBlobItem blobItem : objects) { for (ListBlobItem blobItem : objects) {
// Check that the maximum listing count is not exhausted. // Check that the maximum listing count is not exhausted.
// //
@ -2264,22 +2243,34 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
metadata = new FileMetadata(blobKey, metadata = new FileMetadata(blobKey,
properties.getLastModified().getTime(), properties.getLastModified().getTime(),
getPermissionStatus(blob), getPermissionStatus(blob),
BlobMaterialization.Explicit); BlobMaterialization.Explicit,
hadoopBlockSize);
} else { } else {
metadata = new FileMetadata( metadata = new FileMetadata(
blobKey, blobKey,
getDataLength(blob, properties), getDataLength(blob, properties),
properties.getLastModified().getTime(), properties.getLastModified().getTime(),
getPermissionStatus(blob)); getPermissionStatus(blob),
hadoopBlockSize);
} }
// Add the metadata but remove duplicates. Note that the azure
// storage java SDK returns two types of entries: CloudBlobWrappter
// and CloudDirectoryWrapper. In the case where WASB generated the
// data, there will be an empty blob for each "directory", and we will
// receive a CloudBlobWrapper. If there are also files within this
// "directory", we will also receive a CloudDirectoryWrapper. To
// complicate matters, the data may not be generated by WASB, in
// which case we may not have an empty blob for each "directory".
// So, sometimes we receive both a CloudBlobWrapper and a
// CloudDirectoryWrapper for each directory, and sometimes we receive
// one or the other but not both. We remove duplicates, but
// prefer CloudBlobWrapper over CloudDirectoryWrapper.
// Furthermore, it is very unfortunate that the list results are not
// ordered, and it is a partial list which uses continuation. So
// the HashMap is the best structure to remove the duplicates, despite
// its potential large size.
fileMetadata.put(blobKey, metadata);
// Add the metadata to the list, but remove any existing duplicate
// entries first that we may have added by finding nested files.
FileMetadata existing = getFileMetadataInList(fileMetadata, blobKey);
if (existing != null) {
fileMetadata.remove(existing);
}
fileMetadata.add(metadata);
} else if (blobItem instanceof CloudBlobDirectoryWrapper) { } else if (blobItem instanceof CloudBlobDirectoryWrapper) {
CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem; CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem;
// Determine format of directory name depending on whether an absolute // Determine format of directory name depending on whether an absolute
@ -2298,12 +2289,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// inherit the permissions of the first non-directory blob. // inherit the permissions of the first non-directory blob.
// Also, getting a proper value for last-modified is tricky. // Also, getting a proper value for last-modified is tricky.
FileMetadata directoryMetadata = new FileMetadata(dirKey, 0, FileMetadata directoryMetadata = new FileMetadata(dirKey, 0,
defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit); defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit,
hadoopBlockSize);
// Add the directory metadata to the list only if it's not already // Add the directory metadata to the list only if it's not already
// there. // there. See earlier note, we prefer CloudBlobWrapper over
if (getFileMetadataInList(fileMetadata, dirKey) == null) { // CloudDirectoryWrapper because it may have additional metadata (
fileMetadata.add(directoryMetadata); // properties and ACLs).
if (!fileMetadata.containsKey(dirKey)) {
fileMetadata.put(dirKey, directoryMetadata);
} }
if (!enableFlatListing) { if (!enableFlatListing) {
@ -2314,13 +2308,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
} }
} }
} }
// Note: Original code indicated that this may be a hack. return fileMetadata.values().toArray(new FileMetadata[fileMetadata.size()]);
priorLastKey = null;
PartialListing listing = new PartialListing(priorLastKey,
fileMetadata.toArray(new FileMetadata[] {}),
0 == fileMetadata.size() ? new String[] {}
: new String[] { prefix });
return listing;
} catch (Exception e) { } catch (Exception e) {
// Re-throw as an Azure storage exception. // Re-throw as an Azure storage exception.
// //
@ -2334,12 +2322,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* the sorted order of the blob names. * the sorted order of the blob names.
* *
* @param aCloudBlobDirectory Azure blob directory * @param aCloudBlobDirectory Azure blob directory
* @param aFileMetadataList a list of file metadata objects for each * @param metadataHashMap a map of file metadata objects for each
* non-directory blob. * non-directory blob.
* @param maxListingCount maximum length of the built up list. * @param maxListingCount maximum length of the built up list.
*/ */
private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory, private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory,
ArrayList<FileMetadata> aFileMetadataList, final int maxListingCount, HashMap<String, FileMetadata> metadataHashMap, final int maxListingCount,
final int maxListingDepth) throws Exception { final int maxListingDepth) throws Exception {
// Push the blob directory onto the stack. // Push the blob directory onto the stack.
@ -2371,12 +2359,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// (2) maxListingCount > 0 implies that the number of items in the // (2) maxListingCount > 0 implies that the number of items in the
// metadata list is less than the max listing count. // metadata list is less than the max listing count.
while (null != blobItemIterator while (null != blobItemIterator
&& (maxListingCount <= 0 || aFileMetadataList.size() < maxListingCount)) { && (maxListingCount <= 0 || metadataHashMap.size() < maxListingCount)) {
while (blobItemIterator.hasNext()) { while (blobItemIterator.hasNext()) {
// Check if the count of items on the list exhausts the maximum // Check if the count of items on the list exhausts the maximum
// listing count. // listing count.
// //
if (0 < maxListingCount && aFileMetadataList.size() >= maxListingCount) { if (0 < maxListingCount && metadataHashMap.size() >= maxListingCount) {
break; break;
} }
@ -2399,22 +2387,34 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
metadata = new FileMetadata(blobKey, metadata = new FileMetadata(blobKey,
properties.getLastModified().getTime(), properties.getLastModified().getTime(),
getPermissionStatus(blob), getPermissionStatus(blob),
BlobMaterialization.Explicit); BlobMaterialization.Explicit,
hadoopBlockSize);
} else { } else {
metadata = new FileMetadata( metadata = new FileMetadata(
blobKey, blobKey,
getDataLength(blob, properties), getDataLength(blob, properties),
properties.getLastModified().getTime(), properties.getLastModified().getTime(),
getPermissionStatus(blob)); getPermissionStatus(blob),
hadoopBlockSize);
} }
// Add the directory metadata to the list only if it's not already // Add the metadata but remove duplicates. Note that the azure
// there. // storage java SDK returns two types of entries: CloudBlobWrappter
FileMetadata existing = getFileMetadataInList(aFileMetadataList, blobKey); // and CloudDirectoryWrapper. In the case where WASB generated the
if (existing != null) { // data, there will be an empty blob for each "directory", and we will
aFileMetadataList.remove(existing); // receive a CloudBlobWrapper. If there are also files within this
} // "directory", we will also receive a CloudDirectoryWrapper. To
aFileMetadataList.add(metadata); // complicate matters, the data may not be generated by WASB, in
// which case we may not have an empty blob for each "directory".
// So, sometimes we receive both a CloudBlobWrapper and a
// CloudDirectoryWrapper for each directory, and sometimes we receive
// one or the other but not both. We remove duplicates, but
// prefer CloudBlobWrapper over CloudDirectoryWrapper.
// Furthermore, it is very unfortunate that the list results are not
// ordered, and it is a partial list which uses continuation. So
// the HashMap is the best structure to remove the duplicates, despite
// its potential large size.
metadataHashMap.put(blobKey, metadata);
} else if (blobItem instanceof CloudBlobDirectoryWrapper) { } else if (blobItem instanceof CloudBlobDirectoryWrapper) {
CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem; CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem;
@ -2439,7 +2439,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// absolute path is being used or not. // absolute path is being used or not.
String dirKey = normalizeKey(directory); String dirKey = normalizeKey(directory);
if (getFileMetadataInList(aFileMetadataList, dirKey) == null) { // Add the directory metadata to the list only if it's not already
// there. See earlier note, we prefer CloudBlobWrapper over
// CloudDirectoryWrapper because it may have additional metadata (
// properties and ACLs).
if (!metadataHashMap.containsKey(dirKey)) {
// Reached the targeted listing depth. Return metadata for the // Reached the targeted listing depth. Return metadata for the
// directory using default permissions. // directory using default permissions.
// //
@ -2450,10 +2455,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
FileMetadata directoryMetadata = new FileMetadata(dirKey, FileMetadata directoryMetadata = new FileMetadata(dirKey,
0, 0,
defaultPermissionNoBlobMetadata(), defaultPermissionNoBlobMetadata(),
BlobMaterialization.Implicit); BlobMaterialization.Implicit,
hadoopBlockSize);
// Add the directory metadata to the list. // Add the directory metadata to the list.
aFileMetadataList.add(directoryMetadata); metadataHashMap.put(dirKey, directoryMetadata);
} }
} }
} }

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.fs.azure; package org.apache.hadoop.fs.azure;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
/** /**
@ -27,12 +29,9 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
* </p> * </p>
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class FileMetadata { class FileMetadata extends FileStatus {
private final String key; // this is not final so that it can be cleared to save memory when not needed.
private final long length; private String key;
private final long lastModified;
private final boolean isDir;
private final PermissionStatus permissionStatus;
private final BlobMaterialization blobMaterialization; private final BlobMaterialization blobMaterialization;
/** /**
@ -46,16 +45,19 @@ class FileMetadata {
* The last modified date (milliseconds since January 1, 1970 UTC.) * The last modified date (milliseconds since January 1, 1970 UTC.)
* @param permissionStatus * @param permissionStatus
* The permission for the file. * The permission for the file.
* @param blockSize
* The Hadoop file block size.
*/ */
public FileMetadata(String key, long length, long lastModified, public FileMetadata(String key, long length, long lastModified,
PermissionStatus permissionStatus) { PermissionStatus permissionStatus, final long blockSize) {
super(length, false, 1, blockSize, lastModified, 0,
permissionStatus.getPermission(),
permissionStatus.getUserName(),
permissionStatus.getGroupName(),
null);
this.key = key; this.key = key;
this.length = length; // Files are never implicit.
this.lastModified = lastModified; this.blobMaterialization = BlobMaterialization.Explicit;
this.isDir = false;
this.permissionStatus = permissionStatus;
this.blobMaterialization = BlobMaterialization.Explicit; // File are never
// implicit.
} }
/** /**
@ -70,37 +72,42 @@ class FileMetadata {
* @param blobMaterialization * @param blobMaterialization
* Whether this is an implicit (no real blob backing it) or explicit * Whether this is an implicit (no real blob backing it) or explicit
* directory. * directory.
* @param blockSize
* The Hadoop file block size.
*/ */
public FileMetadata(String key, long lastModified, public FileMetadata(String key, long lastModified,
PermissionStatus permissionStatus, BlobMaterialization blobMaterialization) { PermissionStatus permissionStatus, BlobMaterialization blobMaterialization,
final long blockSize) {
super(0, true, 1, blockSize, lastModified, 0,
permissionStatus.getPermission(),
permissionStatus.getUserName(),
permissionStatus.getGroupName(),
null);
this.key = key; this.key = key;
this.isDir = true;
this.length = 0;
this.lastModified = lastModified;
this.permissionStatus = permissionStatus;
this.blobMaterialization = blobMaterialization; this.blobMaterialization = blobMaterialization;
} }
public boolean isDir() { @Override
return isDir; public Path getPath() {
Path p = super.getPath();
if (p == null) {
// Don't store this yet to reduce memory usage, as it will
// stay in the Eden Space and later we will update it
// with the full canonicalized path.
p = NativeAzureFileSystem.keyToPath(key);
}
return p;
} }
/**
* Returns the Azure storage key for the file. Used internally by the framework.
*
* @return The key for the file.
*/
public String getKey() { public String getKey() {
return key; return key;
} }
public long getLength() {
return length;
}
public long getLastModified() {
return lastModified;
}
public PermissionStatus getPermissionStatus() {
return permissionStatus;
}
/** /**
* Indicates whether this is an implicit directory (no real blob backing it) * Indicates whether this is an implicit directory (no real blob backing it)
* or an explicit one. * or an explicit one.
@ -112,9 +119,7 @@ class FileMetadata {
return blobMaterialization; return blobMaterialization;
} }
@Override void removeKey() {
public String toString() { key = null;
return "FileMetadata[" + key + ", " + length + ", " + lastModified + ", "
+ permissionStatus + "]";
} }
} }

View File

@ -31,9 +31,7 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.TreeSet;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher; import java.util.regex.Matcher;
@ -129,20 +127,12 @@ public class NativeAzureFileSystem extends FileSystem {
this.dstKey = dstKey; this.dstKey = dstKey;
this.folderLease = lease; this.folderLease = lease;
this.fs = fs; this.fs = fs;
ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
// List all the files in the folder. // List all the files in the folder.
long start = Time.monotonicNow(); long start = Time.monotonicNow();
String priorLastKey = null; fileMetadata = fs.getStoreInterface().list(srcKey, AZURE_LIST_ALL,
do { AZURE_UNBOUNDED_DEPTH);
PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
AZURE_UNBOUNDED_DEPTH, priorLastKey);
for(FileMetadata file : listing.getFiles()) {
fileMetadataList.add(file);
}
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);
fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
long end = Time.monotonicNow(); long end = Time.monotonicNow();
LOG.debug("Time taken to list {} blobs for rename operation is: {} ms", fileMetadata.length, (end - start)); LOG.debug("Time taken to list {} blobs for rename operation is: {} ms", fileMetadata.length, (end - start));
@ -669,7 +659,6 @@ public class NativeAzureFileSystem extends FileSystem {
public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class); public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class);
static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
/** /**
* The time span in seconds before which we consider a temp blob to be * The time span in seconds before which we consider a temp blob to be
* dangling (not being actively uploaded to) and up for reclamation. * dangling (not being actively uploaded to) and up for reclamation.
@ -685,8 +674,6 @@ public class NativeAzureFileSystem extends FileSystem {
private static final int AZURE_LIST_ALL = -1; private static final int AZURE_LIST_ALL = -1;
private static final int AZURE_UNBOUNDED_DEPTH = -1; private static final int AZURE_UNBOUNDED_DEPTH = -1;
private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
/** /**
* The configuration property that determines which group owns files created * The configuration property that determines which group owns files created
* in WASB. * in WASB.
@ -1196,7 +1183,6 @@ public class NativeAzureFileSystem extends FileSystem {
private NativeFileSystemStore store; private NativeFileSystemStore store;
private AzureNativeFileSystemStore actualStore; private AzureNativeFileSystemStore actualStore;
private Path workingDir; private Path workingDir;
private long blockSize = MAX_AZURE_BLOCK_SIZE;
private AzureFileSystemInstrumentation instrumentation; private AzureFileSystemInstrumentation instrumentation;
private String metricsSourceName; private String metricsSourceName;
private boolean isClosed = false; private boolean isClosed = false;
@ -1361,13 +1347,10 @@ public class NativeAzureFileSystem extends FileSystem {
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
.getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); .getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
MAX_AZURE_BLOCK_SIZE);
this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false); this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
LOG.debug("NativeAzureFileSystem. Initializing."); LOG.debug("NativeAzureFileSystem. Initializing.");
LOG.debug(" blockSize = {}", LOG.debug(" blockSize = {}", store.getHadoopBlockSize());
conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
// Initialize thread counts from user configuration // Initialize thread counts from user configuration
deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS); deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
@ -1491,7 +1474,7 @@ public class NativeAzureFileSystem extends FileSystem {
} }
} }
private static Path keyToPath(String key) { static Path keyToPath(String key) {
if (key.equals("/")) { if (key.equals("/")) {
return new Path("/"); // container return new Path("/"); // container
} }
@ -1599,7 +1582,7 @@ public class NativeAzureFileSystem extends FileSystem {
throw new FileNotFoundException(f.toString()); throw new FileNotFoundException(f.toString());
} }
if (meta.isDir()) { if (meta.isDirectory()) {
throw new FileNotFoundException(f.toString() throw new FileNotFoundException(f.toString()
+ " is a directory not a file."); + " is a directory not a file.");
} }
@ -1815,7 +1798,7 @@ public class NativeAzureFileSystem extends FileSystem {
FileMetadata existingMetadata = store.retrieveMetadata(key); FileMetadata existingMetadata = store.retrieveMetadata(key);
if (existingMetadata != null) { if (existingMetadata != null) {
if (existingMetadata.isDir()) { if (existingMetadata.isDirectory()) {
throw new FileAlreadyExistsException("Cannot create file " + f throw new FileAlreadyExistsException("Cannot create file " + f
+ "; already exists as a directory."); + "; already exists as a directory.");
} }
@ -1833,7 +1816,7 @@ public class NativeAzureFileSystem extends FileSystem {
// already exists. // already exists.
String parentKey = pathToKey(parentFolder); String parentKey = pathToKey(parentFolder);
FileMetadata parentMetadata = store.retrieveMetadata(parentKey); FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
if (parentMetadata != null && parentMetadata.isDir() && if (parentMetadata != null && parentMetadata.isDirectory() &&
parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
if (parentFolderLease != null) { if (parentFolderLease != null) {
store.updateFolderLastModifiedTime(parentKey, parentFolderLease); store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
@ -1850,7 +1833,7 @@ public class NativeAzureFileSystem extends FileSystem {
firstExisting = firstExisting.getParent(); firstExisting = firstExisting.getParent();
metadata = store.retrieveMetadata(pathToKey(firstExisting)); metadata = store.retrieveMetadata(pathToKey(firstExisting));
} }
mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true); mkdirs(parentFolder, metadata.getPermission(), true);
} }
} }
@ -1988,7 +1971,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ parentPath + " whose metadata cannot be retrieved. Can't resolve"); + parentPath + " whose metadata cannot be retrieved. Can't resolve");
} }
if (!parentMetadata.isDir()) { if (!parentMetadata.isDirectory()) {
// Invalid state: the parent path is actually a file. Throw. // Invalid state: the parent path is actually a file. Throw.
throw new AzureException("File " + f + " has a parent directory " throw new AzureException("File " + f + " has a parent directory "
+ parentPath + " which is also a file. Can't resolve."); + parentPath + " which is also a file. Can't resolve.");
@ -1997,7 +1980,7 @@ public class NativeAzureFileSystem extends FileSystem {
// The path exists, determine if it is a folder containing objects, // The path exists, determine if it is a folder containing objects,
// an empty folder, or a simple file and take the appropriate actions. // an empty folder, or a simple file and take the appropriate actions.
if (!metaFile.isDir()) { if (!metaFile.isDirectory()) {
// The path specifies a file. We need to check the parent path // The path specifies a file. We need to check the parent path
// to make sure it's a proper materialized directory before we // to make sure it's a proper materialized directory before we
// delete the file. Otherwise we may get into a situation where // delete the file. Otherwise we may get into a situation where
@ -2114,9 +2097,9 @@ public class NativeAzureFileSystem extends FileSystem {
AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() { AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
@Override @Override
public boolean execute(FileMetadata file) throws IOException{ public boolean execute(FileMetadata file) throws IOException{
if (!deleteFile(file.getKey(), file.isDir())) { if (!deleteFile(file.getKey(), file.isDirectory())) {
LOG.warn("Attempt to delete non-existent {} {}", LOG.warn("Attempt to delete non-existent {} {}",
file.isDir() ? "directory" : "file", file.isDirectory() ? "directory" : "file",
file.getKey()); file.getKey());
} }
return true; return true;
@ -2138,7 +2121,7 @@ public class NativeAzureFileSystem extends FileSystem {
// Delete the current directory if all underlying contents are deleted // Delete the current directory if all underlying contents are deleted
if (isPartialDelete || (store.retrieveMetadata(metaFile.getKey()) != null if (isPartialDelete || (store.retrieveMetadata(metaFile.getKey()) != null
&& !deleteFile(metaFile.getKey(), metaFile.isDir()))) { && !deleteFile(metaFile.getKey(), metaFile.isDirectory()))) {
LOG.error("Failed delete directory : {}", f); LOG.error("Failed delete directory : {}", f);
return false; return false;
} }
@ -2191,7 +2174,7 @@ public class NativeAzureFileSystem extends FileSystem {
// The path exists, determine if it is a folder containing objects, // The path exists, determine if it is a folder containing objects,
// an empty folder, or a simple file and take the appropriate actions. // an empty folder, or a simple file and take the appropriate actions.
if (!metaFile.isDir()) { if (!metaFile.isDirectory()) {
// The path specifies a file. We need to check the parent path // The path specifies a file. We need to check the parent path
// to make sure it's a proper materialized directory before we // to make sure it's a proper materialized directory before we
// delete the file. Otherwise we may get into a situation where // delete the file. Otherwise we may get into a situation where
@ -2234,7 +2217,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ parentPath + " whose metadata cannot be retrieved. Can't resolve"); + parentPath + " whose metadata cannot be retrieved. Can't resolve");
} }
if (!parentMetadata.isDir()) { if (!parentMetadata.isDirectory()) {
// Invalid state: the parent path is actually a file. Throw. // Invalid state: the parent path is actually a file. Throw.
throw new AzureException("File " + f + " has a parent directory " throw new AzureException("File " + f + " has a parent directory "
+ parentPath + " which is also a file. Can't resolve."); + parentPath + " which is also a file. Can't resolve.");
@ -2319,22 +2302,14 @@ public class NativeAzureFileSystem extends FileSystem {
} }
} }
// List all the blobs in the current folder.
String priorLastKey = null;
// Start time for list operation // Start time for list operation
long start = Time.monotonicNow(); long start = Time.monotonicNow();
ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>(); final FileMetadata[] contents;
// List all the files in the folder with AZURE_UNBOUNDED_DEPTH depth. // List all the files in the folder with AZURE_UNBOUNDED_DEPTH depth.
do {
try { try {
PartialListing listing = store.listAll(key, AZURE_LIST_ALL, contents = store.list(key, AZURE_LIST_ALL,
AZURE_UNBOUNDED_DEPTH, priorLastKey); AZURE_UNBOUNDED_DEPTH);
for(FileMetadata file : listing.getFiles()) {
fileMetadataList.add(file);
}
priorLastKey = listing.getPriorLastKey();
} catch (IOException e) { } catch (IOException e) {
Throwable innerException = checkForAzureStorageException(e); Throwable innerException = checkForAzureStorageException(e);
@ -2345,12 +2320,9 @@ public class NativeAzureFileSystem extends FileSystem {
throw e; throw e;
} }
} while (priorLastKey != null);
long end = Time.monotonicNow(); long end = Time.monotonicNow();
LOG.debug("Time taken to list {} blobs for delete operation: {} ms", fileMetadataList.size(), (end - start)); LOG.debug("Time taken to list {} blobs for delete operation: {} ms", contents.length, (end - start));
final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
if (contents.length > 0) { if (contents.length > 0) {
if (!recursive) { if (!recursive) {
@ -2365,9 +2337,9 @@ public class NativeAzureFileSystem extends FileSystem {
AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() { AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
@Override @Override
public boolean execute(FileMetadata file) throws IOException{ public boolean execute(FileMetadata file) throws IOException{
if (!deleteFile(file.getKey(), file.isDir())) { if (!deleteFile(file.getKey(), file.isDirectory())) {
LOG.warn("Attempt to delete non-existent {} {}", LOG.warn("Attempt to delete non-existent {} {}",
file.isDir() ? "directory" : "file", file.isDirectory() ? "directory" : "file",
file.getKey()); file.getKey());
} }
return true; return true;
@ -2384,7 +2356,7 @@ public class NativeAzureFileSystem extends FileSystem {
// Delete the current directory // Delete the current directory
if (store.retrieveMetadata(metaFile.getKey()) != null if (store.retrieveMetadata(metaFile.getKey()) != null
&& !deleteFile(metaFile.getKey(), metaFile.isDir())) { && !deleteFile(metaFile.getKey(), metaFile.isDirectory())) {
LOG.error("Failed delete directory : {}", f); LOG.error("Failed delete directory : {}", f);
return false; return false;
} }
@ -2456,13 +2428,13 @@ public class NativeAzureFileSystem extends FileSystem {
boolean isPartialDelete = false; boolean isPartialDelete = false;
Path pathToDelete = makeAbsolute(keyToPath(folderToDelete.getKey())); Path pathToDelete = makeAbsolute(folderToDelete.getPath());
foldersToProcess.push(folderToDelete); foldersToProcess.push(folderToDelete);
while (!foldersToProcess.empty()) { while (!foldersToProcess.empty()) {
FileMetadata currentFolder = foldersToProcess.pop(); FileMetadata currentFolder = foldersToProcess.pop();
Path currentPath = makeAbsolute(keyToPath(currentFolder.getKey())); Path currentPath = makeAbsolute(currentFolder.getPath());
boolean canDeleteChildren = true; boolean canDeleteChildren = true;
// If authorization is enabled, check for 'write' permission on current folder // If authorization is enabled, check for 'write' permission on current folder
@ -2478,8 +2450,8 @@ public class NativeAzureFileSystem extends FileSystem {
if (canDeleteChildren) { if (canDeleteChildren) {
// get immediate children list // get immediate children list
ArrayList<FileMetadata> fileMetadataList = getChildrenMetadata(currentFolder.getKey(), FileMetadata[] fileMetadataList = store.list(currentFolder.getKey(),
maxListingDepth); AZURE_LIST_ALL, maxListingDepth);
// Process children of currentFolder and add them to list of contents // Process children of currentFolder and add them to list of contents
// that can be deleted. We Perform stickybit check on every file and // that can be deleted. We Perform stickybit check on every file and
@ -2490,12 +2462,12 @@ public class NativeAzureFileSystem extends FileSystem {
// This file/folder cannot be deleted and neither can the parent paths be deleted. // This file/folder cannot be deleted and neither can the parent paths be deleted.
// Remove parent paths from list of contents that can be deleted. // Remove parent paths from list of contents that can be deleted.
canDeleteChildren = false; canDeleteChildren = false;
Path filePath = makeAbsolute(keyToPath(childItem.getKey())); Path filePath = makeAbsolute(childItem.getPath());
LOG.error("User does not have permissions to delete {}. " LOG.error("User does not have permissions to delete {}. "
+ "Parent directory has sticky bit set.", filePath); + "Parent directory has sticky bit set.", filePath);
} else { } else {
// push the child directories to the stack to process their contents // push the child directories to the stack to process their contents
if (childItem.isDir()) { if (childItem.isDirectory()) {
foldersToProcess.push(childItem); foldersToProcess.push(childItem);
} }
// Add items to list of contents that can be deleted. // Add items to list of contents that can be deleted.
@ -2540,23 +2512,6 @@ public class NativeAzureFileSystem extends FileSystem {
return isPartialDelete; return isPartialDelete;
} }
private ArrayList<FileMetadata> getChildrenMetadata(String key, int maxListingDepth)
throws IOException {
String priorLastKey = null;
ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
do {
PartialListing listing = store.listAll(key, AZURE_LIST_ALL,
maxListingDepth, priorLastKey);
for (FileMetadata file : listing.getFiles()) {
fileMetadataList.add(file);
}
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);
return fileMetadataList;
}
private boolean isStickyBitCheckViolated(FileMetadata metaData, private boolean isStickyBitCheckViolated(FileMetadata metaData,
FileMetadata parentMetadata, boolean throwOnException) throws IOException { FileMetadata parentMetadata, boolean throwOnException) throws IOException {
try { try {
@ -2602,13 +2557,13 @@ public class NativeAzureFileSystem extends FileSystem {
} }
// stickybit is not set on parent and hence cannot be violated // stickybit is not set on parent and hence cannot be violated
if (!parentMetadata.getPermissionStatus().getPermission().getStickyBit()) { if (!parentMetadata.getPermission().getStickyBit()) {
return false; return false;
} }
String currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
String parentDirectoryOwner = parentMetadata.getPermissionStatus().getUserName(); String parentDirectoryOwner = parentMetadata.getOwner();
String currentFileOwner = metaData.getPermissionStatus().getUserName(); String currentFileOwner = metaData.getOwner();
// Files/Folders with no owner set will not pass stickybit check // Files/Folders with no owner set will not pass stickybit check
if ((parentDirectoryOwner.equalsIgnoreCase(currentUser)) if ((parentDirectoryOwner.equalsIgnoreCase(currentUser))
@ -2687,7 +2642,15 @@ public class NativeAzureFileSystem extends FileSystem {
Path absolutePath = makeAbsolute(f); Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath); String key = pathToKey(absolutePath);
if (key.length() == 0) { // root always exists if (key.length() == 0) { // root always exists
return newDirectory(null, absolutePath); return new FileStatus(
0,
true,
1,
store.getHadoopBlockSize(),
0,
0,
FsPermission.getDefault(), "", "",
absolutePath.makeQualified(getUri(), getWorkingDirectory()));
} }
// The path is either a folder or a file. Retrieve metadata to // The path is either a folder or a file. Retrieve metadata to
@ -2709,7 +2672,7 @@ public class NativeAzureFileSystem extends FileSystem {
} }
if (meta != null) { if (meta != null) {
if (meta.isDir()) { if (meta.isDirectory()) {
// The path is a folder with files in it. // The path is a folder with files in it.
// //
@ -2723,14 +2686,14 @@ public class NativeAzureFileSystem extends FileSystem {
} }
// Return reference to the directory object. // Return reference to the directory object.
return newDirectory(meta, absolutePath); return updateFileStatusPath(meta, absolutePath);
} }
// The path is a file. // The path is a file.
LOG.debug("Found the path: {} as a file.", f.toString()); LOG.debug("Found the path: {} as a file.", f.toString());
// Return with reference to a file object. // Return with reference to a file object.
return newFile(meta, absolutePath); return updateFileStatusPath(meta, absolutePath);
} }
// File not found. Throw exception no such file or directory. // File not found. Throw exception no such file or directory.
@ -2787,7 +2750,7 @@ public class NativeAzureFileSystem extends FileSystem {
performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "liststatus", absolutePath); performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "liststatus", absolutePath);
String key = pathToKey(absolutePath); String key = pathToKey(absolutePath);
Set<FileStatus> status = new TreeSet<FileStatus>();
FileMetadata meta = null; FileMetadata meta = null;
try { try {
meta = store.retrieveMetadata(key); meta = store.retrieveMetadata(key);
@ -2804,31 +2767,21 @@ public class NativeAzureFileSystem extends FileSystem {
throw ex; throw ex;
} }
if (meta != null) { if (meta == null) {
if (!meta.isDir()) { // There is no metadata found for the path.
LOG.debug("Did not find any metadata for path: {}", key);
throw new FileNotFoundException(f + " is not found");
}
if (!meta.isDirectory()) {
LOG.debug("Found path as a file"); LOG.debug("Found path as a file");
return new FileStatus[] { updateFileStatusPath(meta, absolutePath) };
return new FileStatus[] { newFile(meta, absolutePath) };
} }
String partialKey = null; FileMetadata[] listing;
PartialListing listing = null;
try { listing = listWithErrorHandling(key, AZURE_LIST_ALL, 1);
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
throw ex;
}
// NOTE: We don't check for Null condition as the Store API should return // NOTE: We don't check for Null condition as the Store API should return
// an empty list if there are not listing. // an empty list if there are not listing.
@ -2839,66 +2792,68 @@ public class NativeAzureFileSystem extends FileSystem {
// If any renames were redone, get another listing, // If any renames were redone, get another listing,
// since the current one may have changed due to the redo. // since the current one may have changed due to the redo.
if (renamed) { if (renamed) {
listing = null; listing = listWithErrorHandling(key, AZURE_LIST_ALL, 1);
try {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
} }
throw ex; // We only need to check for AZURE_TEMP_FOLDER if the key is the root,
} // and if it is not the root we also know the exact size of the array
} // of FileStatus.
// NOTE: We don't check for Null condition as the Store API should return FileMetadata[] result = null;
// and empty list if there are not listing.
for (FileMetadata fileMetadata : listing.getFiles()) { if (key.equals("/")) {
Path subpath = keyToPath(fileMetadata.getKey()); ArrayList<FileMetadata> status = new ArrayList<>(listing.length);
// Test whether the metadata represents a file or directory and for (FileMetadata fileMetadata : listing) {
// add the appropriate metadata object. if (fileMetadata.isDirectory()) {
//
// Note: There was a very old bug here where directories were added
// to the status set as files flattening out recursive listings
// using "-lsr" down the file system hierarchy.
if (fileMetadata.isDir()) {
// Make sure we hide the temp upload folder // Make sure we hide the temp upload folder
if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) { if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
// Don't expose that. // Don't expose that.
continue; continue;
} }
status.add(newDirectory(fileMetadata, subpath)); status.add(updateFileStatusPath(fileMetadata, fileMetadata.getPath()));
} else { } else {
status.add(newFile(fileMetadata, subpath)); status.add(updateFileStatusPath(fileMetadata, fileMetadata.getPath()));
} }
} }
result = status.toArray(new FileMetadata[0]);
} else {
for (int i = 0; i < listing.length; i++) {
FileMetadata fileMetadata = listing[i];
listing[i] = updateFileStatusPath(fileMetadata, fileMetadata.getPath());
}
result = listing;
}
LOG.debug("Found path as a directory with {}" LOG.debug("Found path as a directory with {}"
+ " files in it.", status.size()); + " files in it.", result.length);
} else { return result;
// There is no metadata found for the path.
LOG.debug("Did not find any metadata for path: {}", key);
throw new FileNotFoundException(f + " is not found");
} }
return status.toArray(new FileStatus[0]); private FileMetadata[] listWithErrorHandling(String prefix, final int maxListingCount,
final int maxListingDepth) throws IOException {
try {
return store.list(prefix, maxListingCount, maxListingDepth);
} catch (IOException ex) {
Throwable innerException
= NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystemHelper.isFileNotFoundException(
(StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", prefix));
}
throw ex;
}
} }
// Redo any folder renames needed if there are rename pending files in the // Redo any folder renames needed if there are rename pending files in the
// directory listing. Return true if one or more redo operations were done. // directory listing. Return true if one or more redo operations were done.
private boolean conditionalRedoFolderRenames(PartialListing listing) private boolean conditionalRedoFolderRenames(FileMetadata[] listing)
throws IllegalArgumentException, IOException { throws IllegalArgumentException, IOException {
boolean renamed = false; boolean renamed = false;
for (FileMetadata fileMetadata : listing.getFiles()) { for (FileMetadata fileMetadata : listing) {
Path subpath = keyToPath(fileMetadata.getKey()); Path subpath = fileMetadata.getPath();
if (isRenamePendingFile(subpath)) { if (isRenamePendingFile(subpath)) {
FolderRenamePending pending = FolderRenamePending pending =
new FolderRenamePending(subpath, this); new FolderRenamePending(subpath, this);
@ -2914,32 +2869,11 @@ public class NativeAzureFileSystem extends FileSystem {
return path.toString().endsWith(FolderRenamePending.SUFFIX); return path.toString().endsWith(FolderRenamePending.SUFFIX);
} }
private FileStatus newFile(FileMetadata meta, Path path) { private FileMetadata updateFileStatusPath(FileMetadata meta, Path path) {
return new FileStatus ( meta.setPath(path.makeQualified(getUri(), getWorkingDirectory()));
meta.getLength(), // reduce memory use by setting the internal-only key to null
false, meta.removeKey();
1, return meta;
blockSize,
meta.getLastModified(),
0,
meta.getPermissionStatus().getPermission(),
meta.getPermissionStatus().getUserName(),
meta.getPermissionStatus().getGroupName(),
path.makeQualified(getUri(), getWorkingDirectory()));
}
private FileStatus newDirectory(FileMetadata meta, Path path) {
return new FileStatus (
0,
true,
1,
blockSize,
meta == null ? 0 : meta.getLastModified(),
0,
meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
meta == null ? "" : meta.getPermissionStatus().getUserName(),
meta == null ? "" : meta.getPermissionStatus().getGroupName(),
path.makeQualified(getUri(), getWorkingDirectory()));
} }
private static enum UMaskApplyMode { private static enum UMaskApplyMode {
@ -3000,8 +2934,8 @@ public class NativeAzureFileSystem extends FileSystem {
String currentKey = pathToKey(current); String currentKey = pathToKey(current);
FileMetadata currentMetadata = store.retrieveMetadata(currentKey); FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
if (currentMetadata != null && currentMetadata.isDir()) { if (currentMetadata != null && currentMetadata.isDirectory()) {
Path ancestor = keyToPath(currentMetadata.getKey()); Path ancestor = currentMetadata.getPath();
LOG.debug("Found ancestor {}, for path: {}", ancestor.toString(), f.toString()); LOG.debug("Found ancestor {}, for path: {}", ancestor.toString(), f.toString());
return ancestor; return ancestor;
} }
@ -3052,7 +2986,7 @@ public class NativeAzureFileSystem extends FileSystem {
current = parent, parent = current.getParent()) { current = parent, parent = current.getParent()) {
String currentKey = pathToKey(current); String currentKey = pathToKey(current);
FileMetadata currentMetadata = store.retrieveMetadata(currentKey); FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
if (currentMetadata != null && !currentMetadata.isDir()) { if (currentMetadata != null && !currentMetadata.isDirectory()) {
throw new FileAlreadyExistsException("Cannot create directory " + f + " because " throw new FileAlreadyExistsException("Cannot create directory " + f + " because "
+ current + " is an existing file."); + current + " is an existing file.");
} else if (currentMetadata == null) { } else if (currentMetadata == null) {
@ -3099,7 +3033,7 @@ public class NativeAzureFileSystem extends FileSystem {
if (meta == null) { if (meta == null) {
throw new FileNotFoundException(f.toString()); throw new FileNotFoundException(f.toString());
} }
if (meta.isDir()) { if (meta.isDirectory()) {
throw new FileNotFoundException(f.toString() throw new FileNotFoundException(f.toString()
+ " is a directory not a file."); + " is a directory not a file.");
} }
@ -3120,7 +3054,7 @@ public class NativeAzureFileSystem extends FileSystem {
} }
return new FSDataInputStream(new BufferedFSInputStream( return new FSDataInputStream(new BufferedFSInputStream(
new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize)); new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize));
} }
@Override @Override
@ -3196,7 +3130,7 @@ public class NativeAzureFileSystem extends FileSystem {
} }
} }
if (dstMetadata != null && dstMetadata.isDir()) { if (dstMetadata != null && dstMetadata.isDirectory()) {
// It's an existing directory. // It's an existing directory.
performAuthCheck(absoluteDstPath, WasbAuthorizationOperations.WRITE, "rename", performAuthCheck(absoluteDstPath, WasbAuthorizationOperations.WRITE, "rename",
absoluteDstPath); absoluteDstPath);
@ -3232,7 +3166,7 @@ public class NativeAzureFileSystem extends FileSystem {
LOG.debug("Parent of the destination {}" LOG.debug("Parent of the destination {}"
+ " doesn't exist, failing the rename.", dst); + " doesn't exist, failing the rename.", dst);
return false; return false;
} else if (!parentOfDestMetadata.isDir()) { } else if (!parentOfDestMetadata.isDirectory()) {
LOG.debug("Parent of the destination {}" LOG.debug("Parent of the destination {}"
+ " is a file, failing the rename.", dst); + " is a file, failing the rename.", dst);
return false; return false;
@ -3261,7 +3195,7 @@ public class NativeAzureFileSystem extends FileSystem {
// Source doesn't exist // Source doesn't exist
LOG.debug("Source {} doesn't exist, failing the rename.", src); LOG.debug("Source {} doesn't exist, failing the rename.", src);
return false; return false;
} else if (!srcMetadata.isDir()) { } else if (!srcMetadata.isDirectory()) {
LOG.debug("Source {} found as a file, renaming.", src); LOG.debug("Source {} found as a file, renaming.", src);
try { try {
// HADOOP-15086 - file rename must ensure that the destination does // HADOOP-15086 - file rename must ensure that the destination does
@ -3335,7 +3269,7 @@ public class NativeAzureFileSystem extends FileSystem {
// single file. In this case, the parent folder no longer exists if the // single file. In this case, the parent folder no longer exists if the
// file is renamed; so we can safely ignore the null pointer case. // file is renamed; so we can safely ignore the null pointer case.
if (parentMetadata != null) { if (parentMetadata != null) {
if (parentMetadata.isDir() if (parentMetadata.isDirectory()
&& parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
store.storeEmptyFolder(parentKey, store.storeEmptyFolder(parentKey,
createPermissionStatus(FsPermission.getDefault())); createPermissionStatus(FsPermission.getDefault()));
@ -3511,7 +3445,7 @@ public class NativeAzureFileSystem extends FileSystem {
&& !isAllowedUser(currentUgi.getShortUserName(), daemonUsers)) { && !isAllowedUser(currentUgi.getShortUserName(), daemonUsers)) {
//Check if the user is the owner of the file. //Check if the user is the owner of the file.
String owner = metadata.getPermissionStatus().getUserName(); String owner = metadata.getOwner();
if (!currentUgi.getShortUserName().equals(owner)) { if (!currentUgi.getShortUserName().equals(owner)) {
throw new WasbAuthorizationException( throw new WasbAuthorizationException(
String.format("user '%s' does not have the privilege to " String.format("user '%s' does not have the privilege to "
@ -3522,16 +3456,16 @@ public class NativeAzureFileSystem extends FileSystem {
} }
permission = applyUMask(permission, permission = applyUMask(permission,
metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory metadata.isDirectory() ? UMaskApplyMode.ChangeExistingDirectory
: UMaskApplyMode.ChangeExistingFile); : UMaskApplyMode.ChangeExistingFile);
if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
// It's an implicit folder, need to materialize it. // It's an implicit folder, need to materialize it.
store.storeEmptyFolder(key, createPermissionStatus(permission)); store.storeEmptyFolder(key, createPermissionStatus(permission));
} else if (!metadata.getPermissionStatus().getPermission(). } else if (!metadata.getPermission().
equals(permission)) { equals(permission)) {
store.changePermissionStatus(key, new PermissionStatus( store.changePermissionStatus(key, new PermissionStatus(
metadata.getPermissionStatus().getUserName(), metadata.getOwner(),
metadata.getPermissionStatus().getGroupName(), metadata.getGroup(),
permission)); permission));
} }
} }
@ -3579,10 +3513,10 @@ public class NativeAzureFileSystem extends FileSystem {
PermissionStatus newPermissionStatus = new PermissionStatus( PermissionStatus newPermissionStatus = new PermissionStatus(
username == null ? username == null ?
metadata.getPermissionStatus().getUserName() : username, metadata.getOwner() : username,
groupname == null ? groupname == null ?
metadata.getPermissionStatus().getGroupName() : groupname, metadata.getGroup() : groupname,
metadata.getPermissionStatus().getPermission()); metadata.getPermission());
if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
// It's an implicit folder, need to materialize it. // It's an implicit folder, need to materialize it.
store.storeEmptyFolder(key, newPermissionStatus); store.storeEmptyFolder(key, newPermissionStatus);
@ -3778,13 +3712,11 @@ public class NativeAzureFileSystem extends FileSystem {
AZURE_TEMP_EXPIRY_DEFAULT) * 1000; AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
// Go over all the blobs under the given root and look for blobs to // Go over all the blobs under the given root and look for blobs to
// recover. // recover.
String priorLastKey = null; FileMetadata[] listing = store.list(pathToKey(root), AZURE_LIST_ALL,
do { AZURE_UNBOUNDED_DEPTH);
PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
AZURE_UNBOUNDED_DEPTH, priorLastKey);
for (FileMetadata file : listing.getFiles()) { for (FileMetadata file : listing) {
if (!file.isDir()) { // We don't recover directory blobs if (!file.isDirectory()) { // We don't recover directory blobs
// See if this blob has a link in it (meaning it's a place-holder // See if this blob has a link in it (meaning it's a place-holder
// blob for when the upload to the temp blob is complete). // blob for when the upload to the temp blob is complete).
String link = store.getLinkInFileMetadata(file.getKey()); String link = store.getLinkInFileMetadata(file.getKey());
@ -3793,15 +3725,13 @@ public class NativeAzureFileSystem extends FileSystem {
// existent and old enough to be considered dangling. // existent and old enough to be considered dangling.
FileMetadata linkMetadata = store.retrieveMetadata(link); FileMetadata linkMetadata = store.retrieveMetadata(link);
if (linkMetadata != null if (linkMetadata != null
&& linkMetadata.getLastModified() >= cutoffForDangling) { && linkMetadata.getModificationTime() >= cutoffForDangling) {
// Found one! // Found one!
handler.handleFile(file, linkMetadata); handler.handleFile(file, linkMetadata);
} }
} }
} }
} }
priorLastKey = listing.getPriorLastKey();
} while (priorLastKey != null);
} }
/** /**
@ -3888,7 +3818,7 @@ public class NativeAzureFileSystem extends FileSystem {
meta = store.retrieveMetadata(key); meta = store.retrieveMetadata(key);
if (meta != null) { if (meta != null) {
owner = meta.getPermissionStatus().getUserName(); owner = meta.getOwner();
LOG.debug("Retrieved '{}' as owner for path - {}", owner, absolutePath); LOG.debug("Retrieved '{}' as owner for path - {}", owner, absolutePath);
} else { } else {
// meta will be null if file/folder doen not exist // meta will be null if file/folder doen not exist

View File

@ -58,20 +58,21 @@ interface NativeFileSystemStore {
boolean isAtomicRenameKey(String key); boolean isAtomicRenameKey(String key);
/**
* Returns the file block size. This is a fake value used for integration
* of the Azure store with Hadoop.
* @return The file block size.
*/
long getHadoopBlockSize();
void storeEmptyLinkFile(String key, String tempBlobKey, void storeEmptyLinkFile(String key, String tempBlobKey,
PermissionStatus permissionStatus) throws AzureException; PermissionStatus permissionStatus) throws AzureException;
String getLinkInFileMetadata(String key) throws AzureException; String getLinkInFileMetadata(String key) throws AzureException;
PartialListing list(String prefix, final int maxListingCount, FileMetadata[] list(String prefix, final int maxListingCount,
final int maxListingDepth) throws IOException; final int maxListingDepth) throws IOException;
PartialListing list(String prefix, final int maxListingCount,
final int maxListingDepth, String priorLastKey) throws IOException;
PartialListing listAll(String prefix, final int maxListingCount,
final int maxListingDepth, String priorLastKey) throws IOException;
void changePermissionStatus(String key, PermissionStatus newPermission) void changePermissionStatus(String key, PermissionStatus newPermission)
throws AzureException; throws AzureException;

View File

@ -1,61 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* <p>
* Holds information on a directory listing for a {@link NativeFileSystemStore}.
* This includes the {@link FileMetadata files} and directories (their names)
* contained in a directory.
* </p>
* <p>
* This listing may be returned in chunks, so a <code>priorLastKey</code> is
* provided so that the next chunk may be requested.
* </p>
*
* @see NativeFileSystemStore#list(String, int, String)
*/
@InterfaceAudience.Private
class PartialListing {
private final String priorLastKey;
private final FileMetadata[] files;
private final String[] commonPrefixes;
public PartialListing(String priorLastKey, FileMetadata[] files,
String[] commonPrefixes) {
this.priorLastKey = priorLastKey;
this.files = files;
this.commonPrefixes = commonPrefixes;
}
public FileMetadata[] getFiles() {
return files;
}
public String[] getCommonPrefixes() {
return commonPrefixes;
}
public String getPriorLastKey() {
return priorLastKey;
}
}

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.junit.Assume;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
import org.apache.hadoop.fs.contract.ContractTestUtils;
/**
* Test list performance.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ITestListPerformance extends AbstractAzureScaleTest {
private static final Logger LOG = LoggerFactory.getLogger(
ITestListPerformance.class);
private static final Path TEST_DIR_PATH = new Path(
"DirectoryWithManyFiles");
private static final int NUMBER_OF_THREADS = 10;
private static final int NUMBER_OF_FILES_PER_THREAD = 1000;
private int threads;
private int filesPerThread;
private int expectedFileCount;
@Override
public void setUp() throws Exception {
super.setUp();
Configuration conf = getConfiguration();
// fail fast
threads = AzureTestUtils.getTestPropertyInt(conf,
"fs.azure.scale.test.list.performance.threads", NUMBER_OF_THREADS);
filesPerThread = AzureTestUtils.getTestPropertyInt(conf,
"fs.azure.scale.test.list.performance.files", NUMBER_OF_FILES_PER_THREAD);
expectedFileCount = threads * filesPerThread;
LOG.info("Thread = {}, Files per Thread = {}, expected files = {}",
threads, filesPerThread, expectedFileCount);
conf.set("fs.azure.io.retry.max.retries", "1");
conf.set("fs.azure.delete.threads", "16");
createTestAccount();
}
@Override
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
return AzureBlobStorageTestAccount.create(
"itestlistperformance",
EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
null,
true);
}
@Test
public void test_0101_CreateDirectoryWithFiles() throws Exception {
Assume.assumeFalse("Test path exists; skipping", fs.exists(TEST_DIR_PATH));
ExecutorService executorService = Executors.newFixedThreadPool(threads);
CloudBlobContainer container = testAccount.getRealContainer();
final String basePath = (fs.getWorkingDirectory().toUri().getPath() + "/" + TEST_DIR_PATH + "/").substring(1);
ArrayList<Callable<Integer>> tasks = new ArrayList<>(threads);
fs.mkdirs(TEST_DIR_PATH);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
for (int i = 0; i < threads; i++) {
tasks.add(
new Callable<Integer>() {
public Integer call() {
int written = 0;
for (int j = 0; j < filesPerThread; j++) {
String blobName = basePath + UUID.randomUUID().toString();
try {
CloudBlockBlob blob = container.getBlockBlobReference(
blobName);
blob.uploadText("");
written ++;
} catch (Exception e) {
LOG.error("Filed to write {}", blobName, e);
break;
}
}
LOG.info("Thread completed with {} files written", written);
return written;
}
}
);
}
List<Future<Integer>> futures = executorService.invokeAll(tasks,
getTestTimeoutMillis(), TimeUnit.MILLISECONDS);
long elapsedMs = timer.elapsedTimeMs();
LOG.info("time to create files: {} millis", elapsedMs);
for (Future<Integer> future : futures) {
assertTrue("Future timed out", future.isDone());
assertEquals("Future did not write all files timed out",
filesPerThread, future.get().intValue());
}
}
@Test
public void test_0200_ListStatusPerformance() throws Exception {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
FileStatus[] fileList = fs.listStatus(TEST_DIR_PATH);
long elapsedMs = timer.elapsedTimeMs();
LOG.info(String.format(
"files=%1$d, elapsedMs=%2$d",
fileList.length,
elapsedMs));
Map<Path, FileStatus> foundInList =new HashMap<>(expectedFileCount);
for (FileStatus fileStatus : fileList) {
foundInList.put(fileStatus.getPath(), fileStatus);
LOG.info("{}: {}", fileStatus.getPath(),
fileStatus.isDirectory() ? "dir" : "file");
}
assertEquals("Mismatch between expected files and actual",
expectedFileCount, fileList.length);
// now do a listFiles() recursive
ContractTestUtils.NanoTimer initialStatusCallTimer
= new ContractTestUtils.NanoTimer();
RemoteIterator<LocatedFileStatus> listing
= fs.listFiles(TEST_DIR_PATH, true);
long initialListTime = initialStatusCallTimer.elapsedTimeMs();
timer = new ContractTestUtils.NanoTimer();
while (listing.hasNext()) {
FileStatus fileStatus = listing.next();
Path path = fileStatus.getPath();
FileStatus removed = foundInList.remove(path);
assertNotNull("Did not find " + path + "{} in the previous listing",
removed);
}
elapsedMs = timer.elapsedTimeMs();
LOG.info("time for listFiles() initial call: {} millis;"
+ " time to iterate: {} millis", initialListTime, elapsedMs);
assertEquals("Not all files from listStatus() were found in listFiles()",
0, foundInList.size());
}
@Test
public void test_0300_BulkDeletePerformance() throws Exception {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
fs.delete(TEST_DIR_PATH,true);
long elapsedMs = timer.elapsedTimeMs();
LOG.info("time for delete(): {} millis; {} nanoS per file",
elapsedMs, timer.nanosPerOperation(expectedFileCount));
}
}