HADOOP-15547/ WASB: improve listStatus performance.
Contributed by Thomas Marquardt.
This commit is contained in:
parent
76b8beb289
commit
749fff577e
|
@ -47,4 +47,14 @@
|
|||
<Bug pattern="WMI_WRONG_MAP_ITERATOR" />
|
||||
<Priority value="2" />
|
||||
</Match>
|
||||
|
||||
<!-- FileMetadata is used internally for storing metadata but also
|
||||
subclasses FileStatus to reduce allocations when listing a large number
|
||||
of files. When it is returned to an external caller as a FileStatus, the
|
||||
extra metadata is no longer useful and we want the equals and hashCode
|
||||
methods of FileStatus to be used. -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.fs.azure.FileMetadata" />
|
||||
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS" />
|
||||
</Match>
|
||||
</FindBugsFilter>
|
||||
|
|
|
@ -43,6 +43,8 @@
|
|||
<fs.azure.scale.test.huge.partitionsize>unset</fs.azure.scale.test.huge.partitionsize>
|
||||
<!-- Timeout in seconds for scale tests.-->
|
||||
<fs.azure.scale.test.timeout>7200</fs.azure.scale.test.timeout>
|
||||
<fs.azure.scale.test.list.performance.threads>10</fs.azure.scale.test.list.performance.threads>
|
||||
<fs.azure.scale.test.list.performance.files>1000</fs.azure.scale.test.list.performance.files>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
|
@ -298,6 +300,8 @@
|
|||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
|
||||
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
|
||||
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
|
||||
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
|
||||
</systemPropertyVariables>
|
||||
<includes>
|
||||
<include>**/Test*.java</include>
|
||||
|
@ -326,6 +330,8 @@
|
|||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
|
||||
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
|
||||
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
|
||||
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
|
||||
</systemPropertyVariables>
|
||||
<includes>
|
||||
<include>**/TestRollingWindowAverage*.java</include>
|
||||
|
@ -367,6 +373,8 @@
|
|||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
|
||||
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
|
||||
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
|
||||
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
|
||||
</systemPropertyVariables>
|
||||
<!-- Some tests cannot run in parallel. Tests that cover -->
|
||||
<!-- access to the root directory must run in isolation -->
|
||||
|
@ -412,6 +420,8 @@
|
|||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
<fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
|
||||
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
|
||||
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
|
||||
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
|
||||
</systemPropertyVariables>
|
||||
<includes>
|
||||
<include>**/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java</include>
|
||||
|
@ -454,6 +464,8 @@
|
|||
<fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
|
||||
<fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
|
||||
<fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
|
||||
<fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
|
||||
<fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
|
||||
</systemPropertyVariables>
|
||||
<forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
|
||||
<trimStackTrace>false</trimStackTrace>
|
||||
|
|
|
@ -30,7 +30,6 @@ import java.net.URISyntaxException;
|
|||
import java.net.URLDecoder;
|
||||
import java.net.URLEncoder;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.EnumSet;
|
||||
|
@ -128,6 +127,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
// computed as min(2*cpu,8)
|
||||
private static final String KEY_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
|
||||
|
||||
private static final String HADOOP_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
|
||||
private static final String KEY_STREAM_MIN_READ_SIZE = "fs.azure.read.request.size";
|
||||
private static final String KEY_STORAGE_CONNECTION_TIMEOUT = "fs.azure.storage.timeout";
|
||||
private static final String KEY_WRITE_BLOCK_SIZE = "fs.azure.write.request.size";
|
||||
|
@ -252,6 +252,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
// Default block sizes
|
||||
public static final int DEFAULT_DOWNLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
|
||||
public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
|
||||
public static final long DEFAULT_HADOOP_BLOCK_SIZE = 512 * 1024 * 1024L;
|
||||
|
||||
private static final int DEFAULT_INPUT_STREAM_VERSION = 2;
|
||||
|
||||
|
@ -313,6 +314,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
|
||||
private boolean tolerateOobAppends = DEFAULT_READ_TOLERATE_CONCURRENT_APPEND;
|
||||
|
||||
private long hadoopBlockSize = DEFAULT_HADOOP_BLOCK_SIZE;
|
||||
private int downloadBlockSizeBytes = DEFAULT_DOWNLOAD_BLOCK_SIZE;
|
||||
private int uploadBlockSizeBytes = DEFAULT_UPLOAD_BLOCK_SIZE;
|
||||
private int inputStreamVersion = DEFAULT_INPUT_STREAM_VERSION;
|
||||
|
@ -740,6 +742,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
KEY_STREAM_MIN_READ_SIZE, DEFAULT_DOWNLOAD_BLOCK_SIZE);
|
||||
this.uploadBlockSizeBytes = sessionConfiguration.getInt(
|
||||
KEY_WRITE_BLOCK_SIZE, DEFAULT_UPLOAD_BLOCK_SIZE);
|
||||
this.hadoopBlockSize = sessionConfiguration.getLong(
|
||||
HADOOP_BLOCK_SIZE_PROPERTY_NAME, DEFAULT_HADOOP_BLOCK_SIZE);
|
||||
|
||||
this.inputStreamVersion = sessionConfiguration.getInt(
|
||||
KEY_INPUT_STREAM_VERSION, DEFAULT_INPUT_STREAM_VERSION);
|
||||
|
@ -1234,7 +1238,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the file block size. This is a fake value used for integration
|
||||
* of the Azure store with Hadoop.
|
||||
*/
|
||||
@Override
|
||||
public long getHadoopBlockSize() {
|
||||
return hadoopBlockSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* This should be called from any method that does any modifications to the
|
||||
|
@ -2066,7 +2077,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
// The key refers to root directory of container.
|
||||
// Set the modification time for root to zero.
|
||||
return new FileMetadata(key, 0, defaultPermissionNoBlobMetadata(),
|
||||
BlobMaterialization.Implicit);
|
||||
BlobMaterialization.Implicit, hadoopBlockSize);
|
||||
}
|
||||
|
||||
CloudBlobWrapper blob = getBlobReference(key);
|
||||
|
@ -2086,7 +2097,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
if (retrieveFolderAttribute(blob)) {
|
||||
LOG.debug("{} is a folder blob.", key);
|
||||
return new FileMetadata(key, properties.getLastModified().getTime(),
|
||||
getPermissionStatus(blob), BlobMaterialization.Explicit);
|
||||
getPermissionStatus(blob), BlobMaterialization.Explicit, hadoopBlockSize);
|
||||
} else {
|
||||
|
||||
LOG.debug("{} is a normal blob.", key);
|
||||
|
@ -2095,7 +2106,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
key, // Always return denormalized key with metadata.
|
||||
getDataLength(blob, properties),
|
||||
properties.getLastModified().getTime(),
|
||||
getPermissionStatus(blob));
|
||||
getPermissionStatus(blob), hadoopBlockSize);
|
||||
}
|
||||
} catch(StorageException e){
|
||||
if (!NativeAzureFileSystemHelper.isFileNotFoundException(e)) {
|
||||
|
@ -2129,7 +2140,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
BlobProperties properties = blob.getProperties();
|
||||
|
||||
return new FileMetadata(key, properties.getLastModified().getTime(),
|
||||
getPermissionStatus(blob), BlobMaterialization.Implicit);
|
||||
getPermissionStatus(blob), BlobMaterialization.Implicit, hadoopBlockSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2178,46 +2189,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public PartialListing list(String prefix, final int maxListingCount,
|
||||
public FileMetadata[] list(String prefix, final int maxListingCount,
|
||||
final int maxListingDepth) throws IOException {
|
||||
return list(prefix, maxListingCount, maxListingDepth, null);
|
||||
return listInternal(prefix, maxListingCount, maxListingDepth);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PartialListing list(String prefix, final int maxListingCount,
|
||||
final int maxListingDepth, String priorLastKey) throws IOException {
|
||||
return list(prefix, PATH_DELIMITER, maxListingCount, maxListingDepth,
|
||||
priorLastKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PartialListing listAll(String prefix, final int maxListingCount,
|
||||
final int maxListingDepth, String priorLastKey) throws IOException {
|
||||
return list(prefix, null, maxListingCount, maxListingDepth, priorLastKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Searches the given list of {@link FileMetadata} objects for a directory
|
||||
* with the given key.
|
||||
*
|
||||
* @param list
|
||||
* The list to search.
|
||||
* @param key
|
||||
* The key to search for.
|
||||
* @return The wanted directory, or null if not found.
|
||||
*/
|
||||
private static FileMetadata getFileMetadataInList(
|
||||
final Iterable<FileMetadata> list, String key) {
|
||||
for (FileMetadata current : list) {
|
||||
if (current.getKey().equals(key)) {
|
||||
return current;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private PartialListing list(String prefix, String delimiter,
|
||||
final int maxListingCount, final int maxListingDepth, String priorLastKey)
|
||||
private FileMetadata[] listInternal(String prefix, final int maxListingCount,
|
||||
final int maxListingDepth)
|
||||
throws IOException {
|
||||
try {
|
||||
checkContainer(ContainerAccessType.PureRead);
|
||||
|
@ -2241,7 +2219,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
objects = listRootBlobs(prefix, true, enableFlatListing);
|
||||
}
|
||||
|
||||
ArrayList<FileMetadata> fileMetadata = new ArrayList<FileMetadata>();
|
||||
HashMap<String, FileMetadata> fileMetadata = new HashMap<>(256);
|
||||
|
||||
for (ListBlobItem blobItem : objects) {
|
||||
// Check that the maximum listing count is not exhausted.
|
||||
//
|
||||
|
@ -2261,25 +2240,37 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
|
||||
FileMetadata metadata;
|
||||
if (retrieveFolderAttribute(blob)) {
|
||||
metadata = new FileMetadata(blobKey,
|
||||
properties.getLastModified().getTime(),
|
||||
getPermissionStatus(blob),
|
||||
BlobMaterialization.Explicit);
|
||||
metadata = new FileMetadata(blobKey,
|
||||
properties.getLastModified().getTime(),
|
||||
getPermissionStatus(blob),
|
||||
BlobMaterialization.Explicit,
|
||||
hadoopBlockSize);
|
||||
} else {
|
||||
metadata = new FileMetadata(
|
||||
blobKey,
|
||||
getDataLength(blob, properties),
|
||||
properties.getLastModified().getTime(),
|
||||
getPermissionStatus(blob));
|
||||
metadata = new FileMetadata(
|
||||
blobKey,
|
||||
getDataLength(blob, properties),
|
||||
properties.getLastModified().getTime(),
|
||||
getPermissionStatus(blob),
|
||||
hadoopBlockSize);
|
||||
}
|
||||
// Add the metadata but remove duplicates. Note that the azure
|
||||
// storage java SDK returns two types of entries: CloudBlobWrappter
|
||||
// and CloudDirectoryWrapper. In the case where WASB generated the
|
||||
// data, there will be an empty blob for each "directory", and we will
|
||||
// receive a CloudBlobWrapper. If there are also files within this
|
||||
// "directory", we will also receive a CloudDirectoryWrapper. To
|
||||
// complicate matters, the data may not be generated by WASB, in
|
||||
// which case we may not have an empty blob for each "directory".
|
||||
// So, sometimes we receive both a CloudBlobWrapper and a
|
||||
// CloudDirectoryWrapper for each directory, and sometimes we receive
|
||||
// one or the other but not both. We remove duplicates, but
|
||||
// prefer CloudBlobWrapper over CloudDirectoryWrapper.
|
||||
// Furthermore, it is very unfortunate that the list results are not
|
||||
// ordered, and it is a partial list which uses continuation. So
|
||||
// the HashMap is the best structure to remove the duplicates, despite
|
||||
// its potential large size.
|
||||
fileMetadata.put(blobKey, metadata);
|
||||
|
||||
// Add the metadata to the list, but remove any existing duplicate
|
||||
// entries first that we may have added by finding nested files.
|
||||
FileMetadata existing = getFileMetadataInList(fileMetadata, blobKey);
|
||||
if (existing != null) {
|
||||
fileMetadata.remove(existing);
|
||||
}
|
||||
fileMetadata.add(metadata);
|
||||
} else if (blobItem instanceof CloudBlobDirectoryWrapper) {
|
||||
CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem;
|
||||
// Determine format of directory name depending on whether an absolute
|
||||
|
@ -2298,12 +2289,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
// inherit the permissions of the first non-directory blob.
|
||||
// Also, getting a proper value for last-modified is tricky.
|
||||
FileMetadata directoryMetadata = new FileMetadata(dirKey, 0,
|
||||
defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit);
|
||||
defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit,
|
||||
hadoopBlockSize);
|
||||
|
||||
// Add the directory metadata to the list only if it's not already
|
||||
// there.
|
||||
if (getFileMetadataInList(fileMetadata, dirKey) == null) {
|
||||
fileMetadata.add(directoryMetadata);
|
||||
// there. See earlier note, we prefer CloudBlobWrapper over
|
||||
// CloudDirectoryWrapper because it may have additional metadata (
|
||||
// properties and ACLs).
|
||||
if (!fileMetadata.containsKey(dirKey)) {
|
||||
fileMetadata.put(dirKey, directoryMetadata);
|
||||
}
|
||||
|
||||
if (!enableFlatListing) {
|
||||
|
@ -2314,13 +2308,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
}
|
||||
}
|
||||
}
|
||||
// Note: Original code indicated that this may be a hack.
|
||||
priorLastKey = null;
|
||||
PartialListing listing = new PartialListing(priorLastKey,
|
||||
fileMetadata.toArray(new FileMetadata[] {}),
|
||||
0 == fileMetadata.size() ? new String[] {}
|
||||
: new String[] { prefix });
|
||||
return listing;
|
||||
return fileMetadata.values().toArray(new FileMetadata[fileMetadata.size()]);
|
||||
} catch (Exception e) {
|
||||
// Re-throw as an Azure storage exception.
|
||||
//
|
||||
|
@ -2334,13 +2322,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
* the sorted order of the blob names.
|
||||
*
|
||||
* @param aCloudBlobDirectory Azure blob directory
|
||||
* @param aFileMetadataList a list of file metadata objects for each
|
||||
* @param metadataHashMap a map of file metadata objects for each
|
||||
* non-directory blob.
|
||||
* @param maxListingCount maximum length of the built up list.
|
||||
*/
|
||||
private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory,
|
||||
ArrayList<FileMetadata> aFileMetadataList, final int maxListingCount,
|
||||
final int maxListingDepth) throws Exception {
|
||||
HashMap<String, FileMetadata> metadataHashMap, final int maxListingCount,
|
||||
final int maxListingDepth) throws Exception {
|
||||
|
||||
// Push the blob directory onto the stack.
|
||||
//
|
||||
|
@ -2371,12 +2359,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
// (2) maxListingCount > 0 implies that the number of items in the
|
||||
// metadata list is less than the max listing count.
|
||||
while (null != blobItemIterator
|
||||
&& (maxListingCount <= 0 || aFileMetadataList.size() < maxListingCount)) {
|
||||
&& (maxListingCount <= 0 || metadataHashMap.size() < maxListingCount)) {
|
||||
while (blobItemIterator.hasNext()) {
|
||||
// Check if the count of items on the list exhausts the maximum
|
||||
// listing count.
|
||||
//
|
||||
if (0 < maxListingCount && aFileMetadataList.size() >= maxListingCount) {
|
||||
if (0 < maxListingCount && metadataHashMap.size() >= maxListingCount) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2399,22 +2387,34 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
metadata = new FileMetadata(blobKey,
|
||||
properties.getLastModified().getTime(),
|
||||
getPermissionStatus(blob),
|
||||
BlobMaterialization.Explicit);
|
||||
BlobMaterialization.Explicit,
|
||||
hadoopBlockSize);
|
||||
} else {
|
||||
metadata = new FileMetadata(
|
||||
blobKey,
|
||||
getDataLength(blob, properties),
|
||||
properties.getLastModified().getTime(),
|
||||
getPermissionStatus(blob));
|
||||
getPermissionStatus(blob),
|
||||
hadoopBlockSize);
|
||||
}
|
||||
|
||||
// Add the directory metadata to the list only if it's not already
|
||||
// there.
|
||||
FileMetadata existing = getFileMetadataInList(aFileMetadataList, blobKey);
|
||||
if (existing != null) {
|
||||
aFileMetadataList.remove(existing);
|
||||
}
|
||||
aFileMetadataList.add(metadata);
|
||||
// Add the metadata but remove duplicates. Note that the azure
|
||||
// storage java SDK returns two types of entries: CloudBlobWrappter
|
||||
// and CloudDirectoryWrapper. In the case where WASB generated the
|
||||
// data, there will be an empty blob for each "directory", and we will
|
||||
// receive a CloudBlobWrapper. If there are also files within this
|
||||
// "directory", we will also receive a CloudDirectoryWrapper. To
|
||||
// complicate matters, the data may not be generated by WASB, in
|
||||
// which case we may not have an empty blob for each "directory".
|
||||
// So, sometimes we receive both a CloudBlobWrapper and a
|
||||
// CloudDirectoryWrapper for each directory, and sometimes we receive
|
||||
// one or the other but not both. We remove duplicates, but
|
||||
// prefer CloudBlobWrapper over CloudDirectoryWrapper.
|
||||
// Furthermore, it is very unfortunate that the list results are not
|
||||
// ordered, and it is a partial list which uses continuation. So
|
||||
// the HashMap is the best structure to remove the duplicates, despite
|
||||
// its potential large size.
|
||||
metadataHashMap.put(blobKey, metadata);
|
||||
} else if (blobItem instanceof CloudBlobDirectoryWrapper) {
|
||||
CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem;
|
||||
|
||||
|
@ -2439,7 +2439,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
// absolute path is being used or not.
|
||||
String dirKey = normalizeKey(directory);
|
||||
|
||||
if (getFileMetadataInList(aFileMetadataList, dirKey) == null) {
|
||||
// Add the directory metadata to the list only if it's not already
|
||||
// there. See earlier note, we prefer CloudBlobWrapper over
|
||||
// CloudDirectoryWrapper because it may have additional metadata (
|
||||
// properties and ACLs).
|
||||
if (!metadataHashMap.containsKey(dirKey)) {
|
||||
|
||||
// Reached the targeted listing depth. Return metadata for the
|
||||
// directory using default permissions.
|
||||
//
|
||||
|
@ -2450,10 +2455,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
FileMetadata directoryMetadata = new FileMetadata(dirKey,
|
||||
0,
|
||||
defaultPermissionNoBlobMetadata(),
|
||||
BlobMaterialization.Implicit);
|
||||
BlobMaterialization.Implicit,
|
||||
hadoopBlockSize);
|
||||
|
||||
// Add the directory metadata to the list.
|
||||
aFileMetadataList.add(directoryMetadata);
|
||||
metadataHashMap.put(dirKey, directoryMetadata);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
|
||||
/**
|
||||
|
@ -27,12 +29,9 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
|
|||
* </p>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class FileMetadata {
|
||||
private final String key;
|
||||
private final long length;
|
||||
private final long lastModified;
|
||||
private final boolean isDir;
|
||||
private final PermissionStatus permissionStatus;
|
||||
class FileMetadata extends FileStatus {
|
||||
// this is not final so that it can be cleared to save memory when not needed.
|
||||
private String key;
|
||||
private final BlobMaterialization blobMaterialization;
|
||||
|
||||
/**
|
||||
|
@ -46,16 +45,19 @@ class FileMetadata {
|
|||
* The last modified date (milliseconds since January 1, 1970 UTC.)
|
||||
* @param permissionStatus
|
||||
* The permission for the file.
|
||||
* @param blockSize
|
||||
* The Hadoop file block size.
|
||||
*/
|
||||
public FileMetadata(String key, long length, long lastModified,
|
||||
PermissionStatus permissionStatus) {
|
||||
PermissionStatus permissionStatus, final long blockSize) {
|
||||
super(length, false, 1, blockSize, lastModified, 0,
|
||||
permissionStatus.getPermission(),
|
||||
permissionStatus.getUserName(),
|
||||
permissionStatus.getGroupName(),
|
||||
null);
|
||||
this.key = key;
|
||||
this.length = length;
|
||||
this.lastModified = lastModified;
|
||||
this.isDir = false;
|
||||
this.permissionStatus = permissionStatus;
|
||||
this.blobMaterialization = BlobMaterialization.Explicit; // File are never
|
||||
// implicit.
|
||||
// Files are never implicit.
|
||||
this.blobMaterialization = BlobMaterialization.Explicit;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -70,37 +72,42 @@ class FileMetadata {
|
|||
* @param blobMaterialization
|
||||
* Whether this is an implicit (no real blob backing it) or explicit
|
||||
* directory.
|
||||
* @param blockSize
|
||||
* The Hadoop file block size.
|
||||
*/
|
||||
public FileMetadata(String key, long lastModified,
|
||||
PermissionStatus permissionStatus, BlobMaterialization blobMaterialization) {
|
||||
PermissionStatus permissionStatus, BlobMaterialization blobMaterialization,
|
||||
final long blockSize) {
|
||||
super(0, true, 1, blockSize, lastModified, 0,
|
||||
permissionStatus.getPermission(),
|
||||
permissionStatus.getUserName(),
|
||||
permissionStatus.getGroupName(),
|
||||
null);
|
||||
this.key = key;
|
||||
this.isDir = true;
|
||||
this.length = 0;
|
||||
this.lastModified = lastModified;
|
||||
this.permissionStatus = permissionStatus;
|
||||
this.blobMaterialization = blobMaterialization;
|
||||
}
|
||||
|
||||
public boolean isDir() {
|
||||
return isDir;
|
||||
@Override
|
||||
public Path getPath() {
|
||||
Path p = super.getPath();
|
||||
if (p == null) {
|
||||
// Don't store this yet to reduce memory usage, as it will
|
||||
// stay in the Eden Space and later we will update it
|
||||
// with the full canonicalized path.
|
||||
p = NativeAzureFileSystem.keyToPath(key);
|
||||
}
|
||||
return p;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Azure storage key for the file. Used internally by the framework.
|
||||
*
|
||||
* @return The key for the file.
|
||||
*/
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public long getLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
public long getLastModified() {
|
||||
return lastModified;
|
||||
}
|
||||
|
||||
public PermissionStatus getPermissionStatus() {
|
||||
return permissionStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates whether this is an implicit directory (no real blob backing it)
|
||||
* or an explicit one.
|
||||
|
@ -112,9 +119,7 @@ class FileMetadata {
|
|||
return blobMaterialization;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "FileMetadata[" + key + ", " + length + ", " + lastModified + ", "
|
||||
+ permissionStatus + "]";
|
||||
void removeKey() {
|
||||
key = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,9 +31,7 @@ import java.text.SimpleDateFormat;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
import java.util.TimeZone;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.regex.Matcher;
|
||||
|
@ -129,20 +127,12 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
this.dstKey = dstKey;
|
||||
this.folderLease = lease;
|
||||
this.fs = fs;
|
||||
ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
|
||||
|
||||
// List all the files in the folder.
|
||||
long start = Time.monotonicNow();
|
||||
String priorLastKey = null;
|
||||
do {
|
||||
PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
|
||||
AZURE_UNBOUNDED_DEPTH, priorLastKey);
|
||||
for(FileMetadata file : listing.getFiles()) {
|
||||
fileMetadataList.add(file);
|
||||
}
|
||||
priorLastKey = listing.getPriorLastKey();
|
||||
} while (priorLastKey != null);
|
||||
fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
|
||||
fileMetadata = fs.getStoreInterface().list(srcKey, AZURE_LIST_ALL,
|
||||
AZURE_UNBOUNDED_DEPTH);
|
||||
|
||||
long end = Time.monotonicNow();
|
||||
LOG.debug("Time taken to list {} blobs for rename operation is: {} ms", fileMetadata.length, (end - start));
|
||||
|
||||
|
@ -669,7 +659,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class);
|
||||
|
||||
static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
|
||||
/**
|
||||
* The time span in seconds before which we consider a temp blob to be
|
||||
* dangling (not being actively uploaded to) and up for reclamation.
|
||||
|
@ -685,8 +674,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
private static final int AZURE_LIST_ALL = -1;
|
||||
private static final int AZURE_UNBOUNDED_DEPTH = -1;
|
||||
|
||||
private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
|
||||
|
||||
/**
|
||||
* The configuration property that determines which group owns files created
|
||||
* in WASB.
|
||||
|
@ -1196,7 +1183,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
private NativeFileSystemStore store;
|
||||
private AzureNativeFileSystemStore actualStore;
|
||||
private Path workingDir;
|
||||
private long blockSize = MAX_AZURE_BLOCK_SIZE;
|
||||
private AzureFileSystemInstrumentation instrumentation;
|
||||
private String metricsSourceName;
|
||||
private boolean isClosed = false;
|
||||
|
@ -1361,13 +1347,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
||||
this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
|
||||
.getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
|
||||
this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
|
||||
MAX_AZURE_BLOCK_SIZE);
|
||||
|
||||
this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
|
||||
LOG.debug("NativeAzureFileSystem. Initializing.");
|
||||
LOG.debug(" blockSize = {}",
|
||||
conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
|
||||
LOG.debug(" blockSize = {}", store.getHadoopBlockSize());
|
||||
|
||||
// Initialize thread counts from user configuration
|
||||
deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
|
||||
|
@ -1491,7 +1474,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
private static Path keyToPath(String key) {
|
||||
static Path keyToPath(String key) {
|
||||
if (key.equals("/")) {
|
||||
return new Path("/"); // container
|
||||
}
|
||||
|
@ -1599,7 +1582,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
throw new FileNotFoundException(f.toString());
|
||||
}
|
||||
|
||||
if (meta.isDir()) {
|
||||
if (meta.isDirectory()) {
|
||||
throw new FileNotFoundException(f.toString()
|
||||
+ " is a directory not a file.");
|
||||
}
|
||||
|
@ -1815,7 +1798,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
FileMetadata existingMetadata = store.retrieveMetadata(key);
|
||||
if (existingMetadata != null) {
|
||||
if (existingMetadata.isDir()) {
|
||||
if (existingMetadata.isDirectory()) {
|
||||
throw new FileAlreadyExistsException("Cannot create file " + f
|
||||
+ "; already exists as a directory.");
|
||||
}
|
||||
|
@ -1833,7 +1816,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// already exists.
|
||||
String parentKey = pathToKey(parentFolder);
|
||||
FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
|
||||
if (parentMetadata != null && parentMetadata.isDir() &&
|
||||
if (parentMetadata != null && parentMetadata.isDirectory() &&
|
||||
parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
|
||||
if (parentFolderLease != null) {
|
||||
store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
|
||||
|
@ -1850,7 +1833,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
firstExisting = firstExisting.getParent();
|
||||
metadata = store.retrieveMetadata(pathToKey(firstExisting));
|
||||
}
|
||||
mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true);
|
||||
mkdirs(parentFolder, metadata.getPermission(), true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1988,7 +1971,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
|
||||
}
|
||||
|
||||
if (!parentMetadata.isDir()) {
|
||||
if (!parentMetadata.isDirectory()) {
|
||||
// Invalid state: the parent path is actually a file. Throw.
|
||||
throw new AzureException("File " + f + " has a parent directory "
|
||||
+ parentPath + " which is also a file. Can't resolve.");
|
||||
|
@ -1997,7 +1980,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
// The path exists, determine if it is a folder containing objects,
|
||||
// an empty folder, or a simple file and take the appropriate actions.
|
||||
if (!metaFile.isDir()) {
|
||||
if (!metaFile.isDirectory()) {
|
||||
// The path specifies a file. We need to check the parent path
|
||||
// to make sure it's a proper materialized directory before we
|
||||
// delete the file. Otherwise we may get into a situation where
|
||||
|
@ -2114,9 +2097,9 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
|
||||
@Override
|
||||
public boolean execute(FileMetadata file) throws IOException{
|
||||
if (!deleteFile(file.getKey(), file.isDir())) {
|
||||
if (!deleteFile(file.getKey(), file.isDirectory())) {
|
||||
LOG.warn("Attempt to delete non-existent {} {}",
|
||||
file.isDir() ? "directory" : "file",
|
||||
file.isDirectory() ? "directory" : "file",
|
||||
file.getKey());
|
||||
}
|
||||
return true;
|
||||
|
@ -2138,7 +2121,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
// Delete the current directory if all underlying contents are deleted
|
||||
if (isPartialDelete || (store.retrieveMetadata(metaFile.getKey()) != null
|
||||
&& !deleteFile(metaFile.getKey(), metaFile.isDir()))) {
|
||||
&& !deleteFile(metaFile.getKey(), metaFile.isDirectory()))) {
|
||||
LOG.error("Failed delete directory : {}", f);
|
||||
return false;
|
||||
}
|
||||
|
@ -2191,7 +2174,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
// The path exists, determine if it is a folder containing objects,
|
||||
// an empty folder, or a simple file and take the appropriate actions.
|
||||
if (!metaFile.isDir()) {
|
||||
if (!metaFile.isDirectory()) {
|
||||
// The path specifies a file. We need to check the parent path
|
||||
// to make sure it's a proper materialized directory before we
|
||||
// delete the file. Otherwise we may get into a situation where
|
||||
|
@ -2234,7 +2217,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
|
||||
}
|
||||
|
||||
if (!parentMetadata.isDir()) {
|
||||
if (!parentMetadata.isDirectory()) {
|
||||
// Invalid state: the parent path is actually a file. Throw.
|
||||
throw new AzureException("File " + f + " has a parent directory "
|
||||
+ parentPath + " which is also a file. Can't resolve.");
|
||||
|
@ -2319,38 +2302,27 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
// List all the blobs in the current folder.
|
||||
String priorLastKey = null;
|
||||
|
||||
// Start time for list operation
|
||||
long start = Time.monotonicNow();
|
||||
ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
|
||||
final FileMetadata[] contents;
|
||||
|
||||
// List all the files in the folder with AZURE_UNBOUNDED_DEPTH depth.
|
||||
do {
|
||||
try {
|
||||
PartialListing listing = store.listAll(key, AZURE_LIST_ALL,
|
||||
AZURE_UNBOUNDED_DEPTH, priorLastKey);
|
||||
for(FileMetadata file : listing.getFiles()) {
|
||||
fileMetadataList.add(file);
|
||||
}
|
||||
priorLastKey = listing.getPriorLastKey();
|
||||
} catch (IOException e) {
|
||||
Throwable innerException = checkForAzureStorageException(e);
|
||||
try {
|
||||
contents = store.list(key, AZURE_LIST_ALL,
|
||||
AZURE_UNBOUNDED_DEPTH);
|
||||
} catch (IOException e) {
|
||||
Throwable innerException = checkForAzureStorageException(e);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& isFileNotFoundException((StorageException) innerException)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
throw e;
|
||||
if (innerException instanceof StorageException
|
||||
&& isFileNotFoundException((StorageException) innerException)) {
|
||||
return false;
|
||||
}
|
||||
} while (priorLastKey != null);
|
||||
|
||||
throw e;
|
||||
}
|
||||
|
||||
long end = Time.monotonicNow();
|
||||
LOG.debug("Time taken to list {} blobs for delete operation: {} ms", fileMetadataList.size(), (end - start));
|
||||
|
||||
final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
|
||||
LOG.debug("Time taken to list {} blobs for delete operation: {} ms", contents.length, (end - start));
|
||||
|
||||
if (contents.length > 0) {
|
||||
if (!recursive) {
|
||||
|
@ -2365,9 +2337,9 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
|
||||
@Override
|
||||
public boolean execute(FileMetadata file) throws IOException{
|
||||
if (!deleteFile(file.getKey(), file.isDir())) {
|
||||
if (!deleteFile(file.getKey(), file.isDirectory())) {
|
||||
LOG.warn("Attempt to delete non-existent {} {}",
|
||||
file.isDir() ? "directory" : "file",
|
||||
file.isDirectory() ? "directory" : "file",
|
||||
file.getKey());
|
||||
}
|
||||
return true;
|
||||
|
@ -2384,7 +2356,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
// Delete the current directory
|
||||
if (store.retrieveMetadata(metaFile.getKey()) != null
|
||||
&& !deleteFile(metaFile.getKey(), metaFile.isDir())) {
|
||||
&& !deleteFile(metaFile.getKey(), metaFile.isDirectory())) {
|
||||
LOG.error("Failed delete directory : {}", f);
|
||||
return false;
|
||||
}
|
||||
|
@ -2456,13 +2428,13 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
boolean isPartialDelete = false;
|
||||
|
||||
Path pathToDelete = makeAbsolute(keyToPath(folderToDelete.getKey()));
|
||||
Path pathToDelete = makeAbsolute(folderToDelete.getPath());
|
||||
foldersToProcess.push(folderToDelete);
|
||||
|
||||
while (!foldersToProcess.empty()) {
|
||||
|
||||
FileMetadata currentFolder = foldersToProcess.pop();
|
||||
Path currentPath = makeAbsolute(keyToPath(currentFolder.getKey()));
|
||||
Path currentPath = makeAbsolute(currentFolder.getPath());
|
||||
boolean canDeleteChildren = true;
|
||||
|
||||
// If authorization is enabled, check for 'write' permission on current folder
|
||||
|
@ -2478,8 +2450,8 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
if (canDeleteChildren) {
|
||||
|
||||
// get immediate children list
|
||||
ArrayList<FileMetadata> fileMetadataList = getChildrenMetadata(currentFolder.getKey(),
|
||||
maxListingDepth);
|
||||
FileMetadata[] fileMetadataList = store.list(currentFolder.getKey(),
|
||||
AZURE_LIST_ALL, maxListingDepth);
|
||||
|
||||
// Process children of currentFolder and add them to list of contents
|
||||
// that can be deleted. We Perform stickybit check on every file and
|
||||
|
@ -2490,12 +2462,12 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// This file/folder cannot be deleted and neither can the parent paths be deleted.
|
||||
// Remove parent paths from list of contents that can be deleted.
|
||||
canDeleteChildren = false;
|
||||
Path filePath = makeAbsolute(keyToPath(childItem.getKey()));
|
||||
Path filePath = makeAbsolute(childItem.getPath());
|
||||
LOG.error("User does not have permissions to delete {}. "
|
||||
+ "Parent directory has sticky bit set.", filePath);
|
||||
} else {
|
||||
// push the child directories to the stack to process their contents
|
||||
if (childItem.isDir()) {
|
||||
if (childItem.isDirectory()) {
|
||||
foldersToProcess.push(childItem);
|
||||
}
|
||||
// Add items to list of contents that can be deleted.
|
||||
|
@ -2540,23 +2512,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
return isPartialDelete;
|
||||
}
|
||||
|
||||
private ArrayList<FileMetadata> getChildrenMetadata(String key, int maxListingDepth)
|
||||
throws IOException {
|
||||
|
||||
String priorLastKey = null;
|
||||
ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
|
||||
do {
|
||||
PartialListing listing = store.listAll(key, AZURE_LIST_ALL,
|
||||
maxListingDepth, priorLastKey);
|
||||
for (FileMetadata file : listing.getFiles()) {
|
||||
fileMetadataList.add(file);
|
||||
}
|
||||
priorLastKey = listing.getPriorLastKey();
|
||||
} while (priorLastKey != null);
|
||||
|
||||
return fileMetadataList;
|
||||
}
|
||||
|
||||
private boolean isStickyBitCheckViolated(FileMetadata metaData,
|
||||
FileMetadata parentMetadata, boolean throwOnException) throws IOException {
|
||||
try {
|
||||
|
@ -2602,13 +2557,13 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
// stickybit is not set on parent and hence cannot be violated
|
||||
if (!parentMetadata.getPermissionStatus().getPermission().getStickyBit()) {
|
||||
if (!parentMetadata.getPermission().getStickyBit()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
String parentDirectoryOwner = parentMetadata.getPermissionStatus().getUserName();
|
||||
String currentFileOwner = metaData.getPermissionStatus().getUserName();
|
||||
String parentDirectoryOwner = parentMetadata.getOwner();
|
||||
String currentFileOwner = metaData.getOwner();
|
||||
|
||||
// Files/Folders with no owner set will not pass stickybit check
|
||||
if ((parentDirectoryOwner.equalsIgnoreCase(currentUser))
|
||||
|
@ -2687,7 +2642,15 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
Path absolutePath = makeAbsolute(f);
|
||||
String key = pathToKey(absolutePath);
|
||||
if (key.length() == 0) { // root always exists
|
||||
return newDirectory(null, absolutePath);
|
||||
return new FileStatus(
|
||||
0,
|
||||
true,
|
||||
1,
|
||||
store.getHadoopBlockSize(),
|
||||
0,
|
||||
0,
|
||||
FsPermission.getDefault(), "", "",
|
||||
absolutePath.makeQualified(getUri(), getWorkingDirectory()));
|
||||
}
|
||||
|
||||
// The path is either a folder or a file. Retrieve metadata to
|
||||
|
@ -2709,7 +2672,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
if (meta != null) {
|
||||
if (meta.isDir()) {
|
||||
if (meta.isDirectory()) {
|
||||
// The path is a folder with files in it.
|
||||
//
|
||||
|
||||
|
@ -2723,14 +2686,14 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
// Return reference to the directory object.
|
||||
return newDirectory(meta, absolutePath);
|
||||
return updateFileStatusPath(meta, absolutePath);
|
||||
}
|
||||
|
||||
// The path is a file.
|
||||
LOG.debug("Found the path: {} as a file.", f.toString());
|
||||
|
||||
// Return with reference to a file object.
|
||||
return newFile(meta, absolutePath);
|
||||
return updateFileStatusPath(meta, absolutePath);
|
||||
}
|
||||
|
||||
// File not found. Throw exception no such file or directory.
|
||||
|
@ -2787,7 +2750,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "liststatus", absolutePath);
|
||||
|
||||
String key = pathToKey(absolutePath);
|
||||
Set<FileStatus> status = new TreeSet<FileStatus>();
|
||||
|
||||
FileMetadata meta = null;
|
||||
try {
|
||||
meta = store.retrieveMetadata(key);
|
||||
|
@ -2804,101 +2767,93 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
throw ex;
|
||||
}
|
||||
|
||||
if (meta != null) {
|
||||
if (!meta.isDir()) {
|
||||
if (meta == null) {
|
||||
// There is no metadata found for the path.
|
||||
LOG.debug("Did not find any metadata for path: {}", key);
|
||||
throw new FileNotFoundException(f + " is not found");
|
||||
}
|
||||
|
||||
LOG.debug("Found path as a file");
|
||||
if (!meta.isDirectory()) {
|
||||
LOG.debug("Found path as a file");
|
||||
return new FileStatus[] { updateFileStatusPath(meta, absolutePath) };
|
||||
}
|
||||
|
||||
return new FileStatus[] { newFile(meta, absolutePath) };
|
||||
}
|
||||
FileMetadata[] listing;
|
||||
|
||||
String partialKey = null;
|
||||
PartialListing listing = null;
|
||||
listing = listWithErrorHandling(key, AZURE_LIST_ALL, 1);
|
||||
|
||||
try {
|
||||
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
|
||||
} catch (IOException ex) {
|
||||
// NOTE: We don't check for Null condition as the Store API should return
|
||||
// an empty list if there are not listing.
|
||||
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
// For any -RenamePending.json files in the listing,
|
||||
// push the rename forward.
|
||||
boolean renamed = conditionalRedoFolderRenames(listing);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
// If any renames were redone, get another listing,
|
||||
// since the current one may have changed due to the redo.
|
||||
if (renamed) {
|
||||
listing = listWithErrorHandling(key, AZURE_LIST_ALL, 1);
|
||||
}
|
||||
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
// We only need to check for AZURE_TEMP_FOLDER if the key is the root,
|
||||
// and if it is not the root we also know the exact size of the array
|
||||
// of FileStatus.
|
||||
|
||||
throw ex;
|
||||
}
|
||||
// NOTE: We don't check for Null condition as the Store API should return
|
||||
// an empty list if there are not listing.
|
||||
FileMetadata[] result = null;
|
||||
|
||||
// For any -RenamePending.json files in the listing,
|
||||
// push the rename forward.
|
||||
boolean renamed = conditionalRedoFolderRenames(listing);
|
||||
if (key.equals("/")) {
|
||||
ArrayList<FileMetadata> status = new ArrayList<>(listing.length);
|
||||
|
||||
// If any renames were redone, get another listing,
|
||||
// since the current one may have changed due to the redo.
|
||||
if (renamed) {
|
||||
listing = null;
|
||||
try {
|
||||
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
|
||||
} catch (IOException ex) {
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
|
||||
|
||||
throw new FileNotFoundException(String.format("%s is not found", key));
|
||||
}
|
||||
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: We don't check for Null condition as the Store API should return
|
||||
// and empty list if there are not listing.
|
||||
|
||||
for (FileMetadata fileMetadata : listing.getFiles()) {
|
||||
Path subpath = keyToPath(fileMetadata.getKey());
|
||||
|
||||
// Test whether the metadata represents a file or directory and
|
||||
// add the appropriate metadata object.
|
||||
//
|
||||
// Note: There was a very old bug here where directories were added
|
||||
// to the status set as files flattening out recursive listings
|
||||
// using "-lsr" down the file system hierarchy.
|
||||
if (fileMetadata.isDir()) {
|
||||
for (FileMetadata fileMetadata : listing) {
|
||||
if (fileMetadata.isDirectory()) {
|
||||
// Make sure we hide the temp upload folder
|
||||
if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
|
||||
// Don't expose that.
|
||||
continue;
|
||||
}
|
||||
status.add(newDirectory(fileMetadata, subpath));
|
||||
status.add(updateFileStatusPath(fileMetadata, fileMetadata.getPath()));
|
||||
} else {
|
||||
status.add(newFile(fileMetadata, subpath));
|
||||
status.add(updateFileStatusPath(fileMetadata, fileMetadata.getPath()));
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("Found path as a directory with {}"
|
||||
+ " files in it.", status.size());
|
||||
|
||||
result = status.toArray(new FileMetadata[0]);
|
||||
} else {
|
||||
// There is no metadata found for the path.
|
||||
LOG.debug("Did not find any metadata for path: {}", key);
|
||||
|
||||
throw new FileNotFoundException(f + " is not found");
|
||||
for (int i = 0; i < listing.length; i++) {
|
||||
FileMetadata fileMetadata = listing[i];
|
||||
listing[i] = updateFileStatusPath(fileMetadata, fileMetadata.getPath());
|
||||
}
|
||||
result = listing;
|
||||
}
|
||||
|
||||
return status.toArray(new FileStatus[0]);
|
||||
LOG.debug("Found path as a directory with {}"
|
||||
+ " files in it.", result.length);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private FileMetadata[] listWithErrorHandling(String prefix, final int maxListingCount,
|
||||
final int maxListingDepth) throws IOException {
|
||||
try {
|
||||
return store.list(prefix, maxListingCount, maxListingDepth);
|
||||
} catch (IOException ex) {
|
||||
Throwable innerException
|
||||
= NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
|
||||
if (innerException instanceof StorageException
|
||||
&& NativeAzureFileSystemHelper.isFileNotFoundException(
|
||||
(StorageException) innerException)) {
|
||||
throw new FileNotFoundException(String.format("%s is not found", prefix));
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
// Redo any folder renames needed if there are rename pending files in the
|
||||
// directory listing. Return true if one or more redo operations were done.
|
||||
private boolean conditionalRedoFolderRenames(PartialListing listing)
|
||||
private boolean conditionalRedoFolderRenames(FileMetadata[] listing)
|
||||
throws IllegalArgumentException, IOException {
|
||||
boolean renamed = false;
|
||||
for (FileMetadata fileMetadata : listing.getFiles()) {
|
||||
Path subpath = keyToPath(fileMetadata.getKey());
|
||||
for (FileMetadata fileMetadata : listing) {
|
||||
Path subpath = fileMetadata.getPath();
|
||||
if (isRenamePendingFile(subpath)) {
|
||||
FolderRenamePending pending =
|
||||
new FolderRenamePending(subpath, this);
|
||||
|
@ -2914,32 +2869,11 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
return path.toString().endsWith(FolderRenamePending.SUFFIX);
|
||||
}
|
||||
|
||||
private FileStatus newFile(FileMetadata meta, Path path) {
|
||||
return new FileStatus (
|
||||
meta.getLength(),
|
||||
false,
|
||||
1,
|
||||
blockSize,
|
||||
meta.getLastModified(),
|
||||
0,
|
||||
meta.getPermissionStatus().getPermission(),
|
||||
meta.getPermissionStatus().getUserName(),
|
||||
meta.getPermissionStatus().getGroupName(),
|
||||
path.makeQualified(getUri(), getWorkingDirectory()));
|
||||
}
|
||||
|
||||
private FileStatus newDirectory(FileMetadata meta, Path path) {
|
||||
return new FileStatus (
|
||||
0,
|
||||
true,
|
||||
1,
|
||||
blockSize,
|
||||
meta == null ? 0 : meta.getLastModified(),
|
||||
0,
|
||||
meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
|
||||
meta == null ? "" : meta.getPermissionStatus().getUserName(),
|
||||
meta == null ? "" : meta.getPermissionStatus().getGroupName(),
|
||||
path.makeQualified(getUri(), getWorkingDirectory()));
|
||||
private FileMetadata updateFileStatusPath(FileMetadata meta, Path path) {
|
||||
meta.setPath(path.makeQualified(getUri(), getWorkingDirectory()));
|
||||
// reduce memory use by setting the internal-only key to null
|
||||
meta.removeKey();
|
||||
return meta;
|
||||
}
|
||||
|
||||
private static enum UMaskApplyMode {
|
||||
|
@ -3000,8 +2934,8 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
String currentKey = pathToKey(current);
|
||||
FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
|
||||
if (currentMetadata != null && currentMetadata.isDir()) {
|
||||
Path ancestor = keyToPath(currentMetadata.getKey());
|
||||
if (currentMetadata != null && currentMetadata.isDirectory()) {
|
||||
Path ancestor = currentMetadata.getPath();
|
||||
LOG.debug("Found ancestor {}, for path: {}", ancestor.toString(), f.toString());
|
||||
return ancestor;
|
||||
}
|
||||
|
@ -3052,7 +2986,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
current = parent, parent = current.getParent()) {
|
||||
String currentKey = pathToKey(current);
|
||||
FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
|
||||
if (currentMetadata != null && !currentMetadata.isDir()) {
|
||||
if (currentMetadata != null && !currentMetadata.isDirectory()) {
|
||||
throw new FileAlreadyExistsException("Cannot create directory " + f + " because "
|
||||
+ current + " is an existing file.");
|
||||
} else if (currentMetadata == null) {
|
||||
|
@ -3099,7 +3033,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
if (meta == null) {
|
||||
throw new FileNotFoundException(f.toString());
|
||||
}
|
||||
if (meta.isDir()) {
|
||||
if (meta.isDirectory()) {
|
||||
throw new FileNotFoundException(f.toString()
|
||||
+ " is a directory not a file.");
|
||||
}
|
||||
|
@ -3120,7 +3054,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
return new FSDataInputStream(new BufferedFSInputStream(
|
||||
new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize));
|
||||
new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3196,7 +3130,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
if (dstMetadata != null && dstMetadata.isDir()) {
|
||||
if (dstMetadata != null && dstMetadata.isDirectory()) {
|
||||
// It's an existing directory.
|
||||
performAuthCheck(absoluteDstPath, WasbAuthorizationOperations.WRITE, "rename",
|
||||
absoluteDstPath);
|
||||
|
@ -3232,7 +3166,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
LOG.debug("Parent of the destination {}"
|
||||
+ " doesn't exist, failing the rename.", dst);
|
||||
return false;
|
||||
} else if (!parentOfDestMetadata.isDir()) {
|
||||
} else if (!parentOfDestMetadata.isDirectory()) {
|
||||
LOG.debug("Parent of the destination {}"
|
||||
+ " is a file, failing the rename.", dst);
|
||||
return false;
|
||||
|
@ -3261,7 +3195,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// Source doesn't exist
|
||||
LOG.debug("Source {} doesn't exist, failing the rename.", src);
|
||||
return false;
|
||||
} else if (!srcMetadata.isDir()) {
|
||||
} else if (!srcMetadata.isDirectory()) {
|
||||
LOG.debug("Source {} found as a file, renaming.", src);
|
||||
try {
|
||||
// HADOOP-15086 - file rename must ensure that the destination does
|
||||
|
@ -3335,7 +3269,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// single file. In this case, the parent folder no longer exists if the
|
||||
// file is renamed; so we can safely ignore the null pointer case.
|
||||
if (parentMetadata != null) {
|
||||
if (parentMetadata.isDir()
|
||||
if (parentMetadata.isDirectory()
|
||||
&& parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
|
||||
store.storeEmptyFolder(parentKey,
|
||||
createPermissionStatus(FsPermission.getDefault()));
|
||||
|
@ -3511,7 +3445,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
&& !isAllowedUser(currentUgi.getShortUserName(), daemonUsers)) {
|
||||
|
||||
//Check if the user is the owner of the file.
|
||||
String owner = metadata.getPermissionStatus().getUserName();
|
||||
String owner = metadata.getOwner();
|
||||
if (!currentUgi.getShortUserName().equals(owner)) {
|
||||
throw new WasbAuthorizationException(
|
||||
String.format("user '%s' does not have the privilege to "
|
||||
|
@ -3522,16 +3456,16 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
permission = applyUMask(permission,
|
||||
metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
|
||||
metadata.isDirectory() ? UMaskApplyMode.ChangeExistingDirectory
|
||||
: UMaskApplyMode.ChangeExistingFile);
|
||||
if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
|
||||
// It's an implicit folder, need to materialize it.
|
||||
store.storeEmptyFolder(key, createPermissionStatus(permission));
|
||||
} else if (!metadata.getPermissionStatus().getPermission().
|
||||
} else if (!metadata.getPermission().
|
||||
equals(permission)) {
|
||||
store.changePermissionStatus(key, new PermissionStatus(
|
||||
metadata.getPermissionStatus().getUserName(),
|
||||
metadata.getPermissionStatus().getGroupName(),
|
||||
metadata.getOwner(),
|
||||
metadata.getGroup(),
|
||||
permission));
|
||||
}
|
||||
}
|
||||
|
@ -3579,10 +3513,10 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
|
||||
PermissionStatus newPermissionStatus = new PermissionStatus(
|
||||
username == null ?
|
||||
metadata.getPermissionStatus().getUserName() : username,
|
||||
metadata.getOwner() : username,
|
||||
groupname == null ?
|
||||
metadata.getPermissionStatus().getGroupName() : groupname,
|
||||
metadata.getPermissionStatus().getPermission());
|
||||
metadata.getGroup() : groupname,
|
||||
metadata.getPermission());
|
||||
if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
|
||||
// It's an implicit folder, need to materialize it.
|
||||
store.storeEmptyFolder(key, newPermissionStatus);
|
||||
|
@ -3778,30 +3712,26 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
|
||||
// Go over all the blobs under the given root and look for blobs to
|
||||
// recover.
|
||||
String priorLastKey = null;
|
||||
do {
|
||||
PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
|
||||
AZURE_UNBOUNDED_DEPTH, priorLastKey);
|
||||
FileMetadata[] listing = store.list(pathToKey(root), AZURE_LIST_ALL,
|
||||
AZURE_UNBOUNDED_DEPTH);
|
||||
|
||||
for (FileMetadata file : listing.getFiles()) {
|
||||
if (!file.isDir()) { // We don't recover directory blobs
|
||||
// See if this blob has a link in it (meaning it's a place-holder
|
||||
// blob for when the upload to the temp blob is complete).
|
||||
String link = store.getLinkInFileMetadata(file.getKey());
|
||||
if (link != null) {
|
||||
// It has a link, see if the temp blob it is pointing to is
|
||||
// existent and old enough to be considered dangling.
|
||||
FileMetadata linkMetadata = store.retrieveMetadata(link);
|
||||
if (linkMetadata != null
|
||||
&& linkMetadata.getLastModified() >= cutoffForDangling) {
|
||||
// Found one!
|
||||
handler.handleFile(file, linkMetadata);
|
||||
}
|
||||
for (FileMetadata file : listing) {
|
||||
if (!file.isDirectory()) { // We don't recover directory blobs
|
||||
// See if this blob has a link in it (meaning it's a place-holder
|
||||
// blob for when the upload to the temp blob is complete).
|
||||
String link = store.getLinkInFileMetadata(file.getKey());
|
||||
if (link != null) {
|
||||
// It has a link, see if the temp blob it is pointing to is
|
||||
// existent and old enough to be considered dangling.
|
||||
FileMetadata linkMetadata = store.retrieveMetadata(link);
|
||||
if (linkMetadata != null
|
||||
&& linkMetadata.getModificationTime() >= cutoffForDangling) {
|
||||
// Found one!
|
||||
handler.handleFile(file, linkMetadata);
|
||||
}
|
||||
}
|
||||
}
|
||||
priorLastKey = listing.getPriorLastKey();
|
||||
} while (priorLastKey != null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3888,7 +3818,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
meta = store.retrieveMetadata(key);
|
||||
|
||||
if (meta != null) {
|
||||
owner = meta.getPermissionStatus().getUserName();
|
||||
owner = meta.getOwner();
|
||||
LOG.debug("Retrieved '{}' as owner for path - {}", owner, absolutePath);
|
||||
} else {
|
||||
// meta will be null if file/folder doen not exist
|
||||
|
|
|
@ -58,20 +58,21 @@ interface NativeFileSystemStore {
|
|||
|
||||
boolean isAtomicRenameKey(String key);
|
||||
|
||||
/**
|
||||
* Returns the file block size. This is a fake value used for integration
|
||||
* of the Azure store with Hadoop.
|
||||
* @return The file block size.
|
||||
*/
|
||||
long getHadoopBlockSize();
|
||||
|
||||
void storeEmptyLinkFile(String key, String tempBlobKey,
|
||||
PermissionStatus permissionStatus) throws AzureException;
|
||||
|
||||
String getLinkInFileMetadata(String key) throws AzureException;
|
||||
|
||||
PartialListing list(String prefix, final int maxListingCount,
|
||||
FileMetadata[] list(String prefix, final int maxListingCount,
|
||||
final int maxListingDepth) throws IOException;
|
||||
|
||||
PartialListing list(String prefix, final int maxListingCount,
|
||||
final int maxListingDepth, String priorLastKey) throws IOException;
|
||||
|
||||
PartialListing listAll(String prefix, final int maxListingCount,
|
||||
final int maxListingDepth, String priorLastKey) throws IOException;
|
||||
|
||||
void changePermissionStatus(String key, PermissionStatus newPermission)
|
||||
throws AzureException;
|
||||
|
||||
|
|
|
@ -1,61 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Holds information on a directory listing for a {@link NativeFileSystemStore}.
|
||||
* This includes the {@link FileMetadata files} and directories (their names)
|
||||
* contained in a directory.
|
||||
* </p>
|
||||
* <p>
|
||||
* This listing may be returned in chunks, so a <code>priorLastKey</code> is
|
||||
* provided so that the next chunk may be requested.
|
||||
* </p>
|
||||
*
|
||||
* @see NativeFileSystemStore#list(String, int, String)
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class PartialListing {
|
||||
|
||||
private final String priorLastKey;
|
||||
private final FileMetadata[] files;
|
||||
private final String[] commonPrefixes;
|
||||
|
||||
public PartialListing(String priorLastKey, FileMetadata[] files,
|
||||
String[] commonPrefixes) {
|
||||
this.priorLastKey = priorLastKey;
|
||||
this.files = files;
|
||||
this.commonPrefixes = commonPrefixes;
|
||||
}
|
||||
|
||||
public FileMetadata[] getFiles() {
|
||||
return files;
|
||||
}
|
||||
|
||||
public String[] getCommonPrefixes() {
|
||||
return commonPrefixes;
|
||||
}
|
||||
|
||||
public String getPriorLastKey() {
|
||||
return priorLastKey;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,196 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import org.junit.Assume;
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Test;
|
||||
import org.junit.runners.MethodSorters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
|
||||
import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
|
||||
/**
|
||||
* Test list performance.
|
||||
*/
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
|
||||
public class ITestListPerformance extends AbstractAzureScaleTest {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
ITestListPerformance.class);
|
||||
|
||||
private static final Path TEST_DIR_PATH = new Path(
|
||||
"DirectoryWithManyFiles");
|
||||
|
||||
private static final int NUMBER_OF_THREADS = 10;
|
||||
private static final int NUMBER_OF_FILES_PER_THREAD = 1000;
|
||||
|
||||
private int threads;
|
||||
|
||||
private int filesPerThread;
|
||||
|
||||
private int expectedFileCount;
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
Configuration conf = getConfiguration();
|
||||
// fail fast
|
||||
threads = AzureTestUtils.getTestPropertyInt(conf,
|
||||
"fs.azure.scale.test.list.performance.threads", NUMBER_OF_THREADS);
|
||||
filesPerThread = AzureTestUtils.getTestPropertyInt(conf,
|
||||
"fs.azure.scale.test.list.performance.files", NUMBER_OF_FILES_PER_THREAD);
|
||||
expectedFileCount = threads * filesPerThread;
|
||||
LOG.info("Thread = {}, Files per Thread = {}, expected files = {}",
|
||||
threads, filesPerThread, expectedFileCount);
|
||||
conf.set("fs.azure.io.retry.max.retries", "1");
|
||||
conf.set("fs.azure.delete.threads", "16");
|
||||
createTestAccount();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||
return AzureBlobStorageTestAccount.create(
|
||||
"itestlistperformance",
|
||||
EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
|
||||
null,
|
||||
true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_0101_CreateDirectoryWithFiles() throws Exception {
|
||||
Assume.assumeFalse("Test path exists; skipping", fs.exists(TEST_DIR_PATH));
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(threads);
|
||||
CloudBlobContainer container = testAccount.getRealContainer();
|
||||
|
||||
final String basePath = (fs.getWorkingDirectory().toUri().getPath() + "/" + TEST_DIR_PATH + "/").substring(1);
|
||||
|
||||
ArrayList<Callable<Integer>> tasks = new ArrayList<>(threads);
|
||||
fs.mkdirs(TEST_DIR_PATH);
|
||||
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||
for (int i = 0; i < threads; i++) {
|
||||
tasks.add(
|
||||
new Callable<Integer>() {
|
||||
public Integer call() {
|
||||
int written = 0;
|
||||
for (int j = 0; j < filesPerThread; j++) {
|
||||
String blobName = basePath + UUID.randomUUID().toString();
|
||||
try {
|
||||
CloudBlockBlob blob = container.getBlockBlobReference(
|
||||
blobName);
|
||||
blob.uploadText("");
|
||||
written ++;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Filed to write {}", blobName, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
LOG.info("Thread completed with {} files written", written);
|
||||
return written;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
List<Future<Integer>> futures = executorService.invokeAll(tasks,
|
||||
getTestTimeoutMillis(), TimeUnit.MILLISECONDS);
|
||||
long elapsedMs = timer.elapsedTimeMs();
|
||||
LOG.info("time to create files: {} millis", elapsedMs);
|
||||
|
||||
for (Future<Integer> future : futures) {
|
||||
assertTrue("Future timed out", future.isDone());
|
||||
assertEquals("Future did not write all files timed out",
|
||||
filesPerThread, future.get().intValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_0200_ListStatusPerformance() throws Exception {
|
||||
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||
FileStatus[] fileList = fs.listStatus(TEST_DIR_PATH);
|
||||
long elapsedMs = timer.elapsedTimeMs();
|
||||
LOG.info(String.format(
|
||||
"files=%1$d, elapsedMs=%2$d",
|
||||
fileList.length,
|
||||
elapsedMs));
|
||||
Map<Path, FileStatus> foundInList =new HashMap<>(expectedFileCount);
|
||||
|
||||
for (FileStatus fileStatus : fileList) {
|
||||
foundInList.put(fileStatus.getPath(), fileStatus);
|
||||
LOG.info("{}: {}", fileStatus.getPath(),
|
||||
fileStatus.isDirectory() ? "dir" : "file");
|
||||
}
|
||||
assertEquals("Mismatch between expected files and actual",
|
||||
expectedFileCount, fileList.length);
|
||||
|
||||
|
||||
// now do a listFiles() recursive
|
||||
ContractTestUtils.NanoTimer initialStatusCallTimer
|
||||
= new ContractTestUtils.NanoTimer();
|
||||
RemoteIterator<LocatedFileStatus> listing
|
||||
= fs.listFiles(TEST_DIR_PATH, true);
|
||||
long initialListTime = initialStatusCallTimer.elapsedTimeMs();
|
||||
timer = new ContractTestUtils.NanoTimer();
|
||||
while (listing.hasNext()) {
|
||||
FileStatus fileStatus = listing.next();
|
||||
Path path = fileStatus.getPath();
|
||||
FileStatus removed = foundInList.remove(path);
|
||||
assertNotNull("Did not find " + path + "{} in the previous listing",
|
||||
removed);
|
||||
}
|
||||
elapsedMs = timer.elapsedTimeMs();
|
||||
LOG.info("time for listFiles() initial call: {} millis;"
|
||||
+ " time to iterate: {} millis", initialListTime, elapsedMs);
|
||||
assertEquals("Not all files from listStatus() were found in listFiles()",
|
||||
0, foundInList.size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_0300_BulkDeletePerformance() throws Exception {
|
||||
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||
fs.delete(TEST_DIR_PATH,true);
|
||||
long elapsedMs = timer.elapsedTimeMs();
|
||||
LOG.info("time for delete(): {} millis; {} nanoS per file",
|
||||
elapsedMs, timer.nanosPerOperation(expectedFileCount));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue