HADOOP-12551. Introduce FileNotFoundException for WASB FileSystem API. Contributed by Dushyanth.

(cherry picked from commit 0e76f1fcea)
(cherry picked from commit 813841e10b)
This commit is contained in:
cnauroth 2016-01-09 22:18:11 -08:00
parent f0f8e3ee7c
commit e26f702866
5 changed files with 783 additions and 111 deletions

View File

@ -914,6 +914,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12678. Handle empty rename pending metadata file during atomic rename HADOOP-12678. Handle empty rename pending metadata file during atomic rename
in redo path. (Madhumita Chakraborty via cnauroth) in redo path. (Madhumita Chakraborty via cnauroth)
HADOOP-12551. Introduce FileNotFoundException for WASB FileSystem API
(Dushyanth via cnauroth)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -79,6 +79,7 @@ import com.microsoft.azure.storage.StorageErrorCode;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageErrorCodeStrings;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
/** /**
@ -176,7 +177,7 @@ public class NativeAzureFileSystem extends FileSystem {
} catch (JsonParseException e) { } catch (JsonParseException e) {
this.committed = false; this.committed = false;
} catch (IOException e) { } catch (IOException e) {
this.committed = false; this.committed = false;
} }
if (!this.committed) { if (!this.committed) {
@ -198,11 +199,11 @@ public class NativeAzureFileSystem extends FileSystem {
this.srcKey = oldFolderName.getTextValue(); this.srcKey = oldFolderName.getTextValue();
this.dstKey = newFolderName.getTextValue(); this.dstKey = newFolderName.getTextValue();
if (this.srcKey == null || this.dstKey == null) { if (this.srcKey == null || this.dstKey == null) {
this.committed = false; this.committed = false;
} else { } else {
JsonNode fileList = json.get("FileList"); JsonNode fileList = json.get("FileList");
if (fileList == null) { if (fileList == null) {
this.committed = false; this.committed = false;
} else { } else {
for (int i = 0; i < fileList.size(); i++) { for (int i = 0; i < fileList.size(); i++) {
fileStrList.add(fileList.get(i).getTextValue()); fileStrList.add(fileList.get(i).getTextValue());
@ -727,7 +728,7 @@ public class NativeAzureFileSystem extends FileSystem {
return result; return result;
} catch(IOException e) { } catch(IOException e) {
Throwable innerException = checkForAzureStorageException(e); Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
if (innerException instanceof StorageException) { if (innerException instanceof StorageException) {
@ -735,7 +736,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ " Exception details: {} Error Code : {}", + " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode()); key, e, ((StorageException) innerException).getErrorCode());
if (isFileNotFoundException((StorageException) innerException)) { if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key)); throw new FileNotFoundException(String.format("%s is not found", key));
} }
} }
@ -781,7 +782,7 @@ public class NativeAzureFileSystem extends FileSystem {
return result; return result;
} catch(IOException e) { } catch(IOException e) {
Throwable innerException = checkForAzureStorageException(e); Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
if (innerException instanceof StorageException) { if (innerException instanceof StorageException) {
@ -789,7 +790,7 @@ public class NativeAzureFileSystem extends FileSystem {
+ " Exception details: {} Error Code : {}", + " Exception details: {} Error Code : {}",
key, e, ((StorageException) innerException).getErrorCode()); key, e, ((StorageException) innerException).getErrorCode());
if (isFileNotFoundException((StorageException) innerException)) { if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key)); throw new FileNotFoundException(String.format("%s is not found", key));
} }
} }
@ -821,10 +822,10 @@ public class NativeAzureFileSystem extends FileSystem {
this.pos); this.pos);
} catch(IOException e) { } catch(IOException e) {
Throwable innerException = checkForAzureStorageException(e); Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
if (innerException instanceof StorageException if (innerException instanceof StorageException
&& isFileNotFoundException((StorageException) innerException)) { && NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key)); throw new FileNotFoundException(String.format("%s is not found", key));
} }
@ -842,40 +843,6 @@ public class NativeAzureFileSystem extends FileSystem {
return false; return false;
} }
/*
* Helper method to recursively check if the cause of the exception is
* a Azure storage exception.
*/
private Throwable checkForAzureStorageException(IOException e) {
Throwable innerException = e.getCause();
while (innerException != null
&& !(innerException instanceof StorageException)) {
innerException = innerException.getCause();
}
return innerException;
}
/*
* Helper method to check if the AzureStorageException is
* because backing blob was not found.
*/
private boolean isFileNotFoundException(StorageException e) {
String errorCode = ((StorageException) e).getErrorCode();
if (errorCode != null
&& (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
|| errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
|| errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
|| errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
return true;
}
return false;
}
/* /*
* Helper method to check if a stream is closed. * Helper method to check if a stream is closed.
@ -1605,7 +1572,20 @@ public class NativeAzureFileSystem extends FileSystem {
// Capture the metadata for the path. // Capture the metadata for the path.
// //
FileMetadata metaFile = store.retrieveMetadata(key); FileMetadata metaFile = null;
try {
metaFile = store.retrieveMetadata(key);
} catch (IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
return false;
}
throw e;
}
if (null == metaFile) { if (null == metaFile) {
// The path to be deleted does not exist. // The path to be deleted does not exist.
@ -1625,12 +1605,44 @@ public class NativeAzureFileSystem extends FileSystem {
Path parentPath = absolutePath.getParent(); Path parentPath = absolutePath.getParent();
if (parentPath.getParent() != null) {// Not root if (parentPath.getParent() != null) {// Not root
String parentKey = pathToKey(parentPath); String parentKey = pathToKey(parentPath);
FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
FileMetadata parentMetadata = null;
try {
parentMetadata = store.retrieveMetadata(parentKey);
} catch (IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
// Invalid State.
// A FileNotFoundException is not thrown here as the API returns false
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
}
throw e;
}
// Invalid State.
// A FileNotFoundException is not thrown here as the API returns false
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
if (parentMetadata == null) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
if (!parentMetadata.isDir()) { if (!parentMetadata.isDir()) {
// Invalid state: the parent path is actually a file. Throw. // Invalid state: the parent path is actually a file. Throw.
throw new AzureException("File " + f + " has a parent directory " throw new AzureException("File " + f + " has a parent directory "
+ parentPath + " which is also a file. Can't resolve."); + parentPath + " which is also a file. Can't resolve.");
} }
if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
LOG.debug("Found an implicit parent directory while trying to" LOG.debug("Found an implicit parent directory while trying to"
+ " delete the file {}. Creating the directory blob for" + " delete the file {}. Creating the directory blob for"
@ -1644,8 +1656,21 @@ public class NativeAzureFileSystem extends FileSystem {
} }
} }
} }
store.delete(key);
instrumentation.fileDeleted(); try {
store.delete(key);
instrumentation.fileDeleted();
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
return false;
}
throw e;
}
} else { } else {
// The path specifies a folder. Recursively delete all entries under the // The path specifies a folder. Recursively delete all entries under the
// folder. // folder.
@ -1653,7 +1678,37 @@ public class NativeAzureFileSystem extends FileSystem {
Path parentPath = absolutePath.getParent(); Path parentPath = absolutePath.getParent();
if (parentPath.getParent() != null) { if (parentPath.getParent() != null) {
String parentKey = pathToKey(parentPath); String parentKey = pathToKey(parentPath);
FileMetadata parentMetadata = store.retrieveMetadata(parentKey); FileMetadata parentMetadata = null;
try {
parentMetadata = store.retrieveMetadata(parentKey);
} catch (IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
if (innerException instanceof StorageException) {
// Invalid State.
// A FileNotFoundException is not thrown here as the API returns false
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
}
throw e;
}
// Invalid State.
// A FileNotFoundException is not thrown here as the API returns false
// if the file not present. But not retrieving metadata here is an
// unrecoverable state and can only happen if there is a race condition
// hence throwing a IOException
if (parentMetadata == null) {
throw new IOException("File " + f + " has a parent directory "
+ parentPath + " whose metadata cannot be retrieved. Can't resolve");
}
if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
LOG.debug("Found an implicit parent directory while trying to" LOG.debug("Found an implicit parent directory while trying to"
@ -1667,8 +1722,26 @@ public class NativeAzureFileSystem extends FileSystem {
// List all the blobs in the current folder. // List all the blobs in the current folder.
String priorLastKey = null; String priorLastKey = null;
PartialListing listing = store.listAll(key, AZURE_LIST_ALL, 1, PartialListing listing = null;
priorLastKey); try {
listing = store.listAll(key, AZURE_LIST_ALL, 1,
priorLastKey);
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
return false;
}
throw e;
}
if (listing == null) {
return false;
}
FileMetadata[] contents = listing.getFiles(); FileMetadata[] contents = listing.getFiles();
if (!recursive && contents.length > 0) { if (!recursive && contents.length > 0) {
// The folder is non-empty and recursive delete was not specified. // The folder is non-empty and recursive delete was not specified.
@ -1685,8 +1758,20 @@ public class NativeAzureFileSystem extends FileSystem {
String suffix = p.getKey().substring( String suffix = p.getKey().substring(
p.getKey().lastIndexOf(PATH_DELIMITER)); p.getKey().lastIndexOf(PATH_DELIMITER));
if (!p.isDir()) { if (!p.isDir()) {
store.delete(key + suffix); try {
instrumentation.fileDeleted(); store.delete(key + suffix);
instrumentation.fileDeleted();
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
return false;
}
throw e;
}
} else { } else {
// Recursively delete contents of the sub-folders. Notice this also // Recursively delete contents of the sub-folders. Notice this also
// deletes the blob for the directory. // deletes the blob for the directory.
@ -1695,7 +1780,20 @@ public class NativeAzureFileSystem extends FileSystem {
} }
} }
} }
store.delete(key);
try {
store.delete(key);
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(e);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
return false;
}
throw e;
}
// Update parent directory last modified time // Update parent directory last modified time
Path parent = absolutePath.getParent(); Path parent = absolutePath.getParent();
@ -1713,7 +1811,7 @@ public class NativeAzureFileSystem extends FileSystem {
} }
@Override @Override
public FileStatus getFileStatus(Path f) throws IOException { public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOException {
LOG.debug("Getting the file status for {}", f.toString()); LOG.debug("Getting the file status for {}", f.toString());
@ -1726,7 +1824,22 @@ public class NativeAzureFileSystem extends FileSystem {
// The path is either a folder or a file. Retrieve metadata to // The path is either a folder or a file. Retrieve metadata to
// determine if it is a directory or file. // determine if it is a directory or file.
FileMetadata meta = store.retrieveMetadata(key); FileMetadata meta = null;
try {
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
throw ex;
}
if (meta != null) { if (meta != null) {
if (meta.isDir()) { if (meta.isDir()) {
// The path is a folder with files in it. // The path is a folder with files in it.
@ -1797,14 +1910,28 @@ public class NativeAzureFileSystem extends FileSystem {
* contained files if it is a directory. * contained files if it is a directory.
*/ */
@Override @Override
public FileStatus[] listStatus(Path f) throws IOException { public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
LOG.debug("Listing status for {}", f.toString()); LOG.debug("Listing status for {}", f.toString());
Path absolutePath = makeAbsolute(f); Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath); String key = pathToKey(absolutePath);
Set<FileStatus> status = new TreeSet<FileStatus>(); Set<FileStatus> status = new TreeSet<FileStatus>();
FileMetadata meta = store.retrieveMetadata(key); FileMetadata meta = null;
try {
meta = store.retrieveMetadata(key);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", f));
}
throw ex;
}
if (meta != null) { if (meta != null) {
if (!meta.isDir()) { if (!meta.isDir()) {
@ -1813,8 +1940,26 @@ public class NativeAzureFileSystem extends FileSystem {
return new FileStatus[] { newFile(meta, absolutePath) }; return new FileStatus[] { newFile(meta, absolutePath) };
} }
String partialKey = null; String partialKey = null;
PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); PartialListing listing = null;
try {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
throw ex;
}
// NOTE: We don't check for Null condition as the Store API should return
// an empty list if there are not listing.
// For any -RenamePending.json files in the listing, // For any -RenamePending.json files in the listing,
// push the rename forward. // push the rename forward.
@ -1823,9 +1968,25 @@ public class NativeAzureFileSystem extends FileSystem {
// If any renames were redone, get another listing, // If any renames were redone, get another listing,
// since the current one may have changed due to the redo. // since the current one may have changed due to the redo.
if (renamed) { if (renamed) {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); listing = null;
try {
listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
throw ex;
}
} }
// NOTE: We don't check for Null condition as the Store API should return
// and empty list if there are not listing.
for (FileMetadata fileMetadata : listing.getFiles()) { for (FileMetadata fileMetadata : listing.getFiles()) {
Path subpath = keyToPath(fileMetadata.getKey()); Path subpath = keyToPath(fileMetadata.getKey());
@ -2024,13 +2185,28 @@ public class NativeAzureFileSystem extends FileSystem {
} }
@Override @Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException { public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
LOG.debug("Opening file: {}", f.toString()); LOG.debug("Opening file: {}", f.toString());
Path absolutePath = makeAbsolute(f); Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath); String key = pathToKey(absolutePath);
FileMetadata meta = store.retrieveMetadata(key); FileMetadata meta = null;
try {
meta = store.retrieveMetadata(key);
} catch(Exception ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
throw ex;
}
if (meta == null) { if (meta == null) {
throw new FileNotFoundException(f.toString()); throw new FileNotFoundException(f.toString());
} }
@ -2039,12 +2215,27 @@ public class NativeAzureFileSystem extends FileSystem {
+ " is a directory not a file."); + " is a directory not a file.");
} }
DataInputStream inputStream = null;
try {
inputStream = store.retrieve(key);
} catch(Exception ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("%s is not found", key));
}
throw ex;
}
return new FSDataInputStream(new BufferedFSInputStream( return new FSDataInputStream(new BufferedFSInputStream(
new NativeAzureFsInputStream(store.retrieve(key), key, meta.getLength()), bufferSize)); new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize));
} }
@Override @Override
public boolean rename(Path src, Path dst) throws IOException { public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {
FolderRenamePending renamePending = null; FolderRenamePending renamePending = null;
@ -2065,7 +2256,27 @@ public class NativeAzureFileSystem extends FileSystem {
// Figure out the final destination // Figure out the final destination
Path absoluteDst = makeAbsolute(dst); Path absoluteDst = makeAbsolute(dst);
String dstKey = pathToKey(absoluteDst); String dstKey = pathToKey(absoluteDst);
FileMetadata dstMetadata = store.retrieveMetadata(dstKey); FileMetadata dstMetadata = null;
try {
dstMetadata = store.retrieveMetadata(dstKey);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
// A BlobNotFound storage exception in only thrown from retrieveMetdata API when
// there is a race condition. If there is another thread which deletes the destination
// file or folder, then this thread calling rename should be able to continue with
// rename gracefully. Hence the StorageException is swallowed here.
if (innerException instanceof StorageException) {
if (NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
+ "Swallowin the exception to handle race condition gracefully", dstKey);
}
} else {
throw ex;
}
}
if (dstMetadata != null && dstMetadata.isDir()) { if (dstMetadata != null && dstMetadata.isDir()) {
// It's an existing directory. // It's an existing directory.
dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
@ -2078,8 +2289,23 @@ public class NativeAzureFileSystem extends FileSystem {
return false; return false;
} else { } else {
// Check that the parent directory exists. // Check that the parent directory exists.
FileMetadata parentOfDestMetadata = FileMetadata parentOfDestMetadata = null;
store.retrieveMetadata(pathToKey(absoluteDst.getParent())); try {
parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
return false;
}
throw ex;
}
if (parentOfDestMetadata == null) { if (parentOfDestMetadata == null) {
LOG.debug("Parent of the destination {}" LOG.debug("Parent of the destination {}"
+ " doesn't exist, failing the rename.", dst); + " doesn't exist, failing the rename.", dst);
@ -2090,14 +2316,43 @@ public class NativeAzureFileSystem extends FileSystem {
return false; return false;
} }
} }
FileMetadata srcMetadata = store.retrieveMetadata(srcKey); FileMetadata srcMetadata = null;
try {
srcMetadata = store.retrieveMetadata(srcKey);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("Source {} doesn't exists. Failing rename", src);
return false;
}
throw ex;
}
if (srcMetadata == null) { if (srcMetadata == null) {
// Source doesn't exist // Source doesn't exist
LOG.debug("Source {} doesn't exist, failing the rename.", src); LOG.debug("Source {} doesn't exist, failing the rename.", src);
return false; return false;
} else if (!srcMetadata.isDir()) { } else if (!srcMetadata.isDir()) {
LOG.debug("Source {} found as a file, renaming.", src); LOG.debug("Source {} found as a file, renaming.", src);
store.rename(srcKey, dstKey); try {
store.rename(srcKey, dstKey);
} catch(IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
LOG.debug("BlobNotFoundException encountered. Failing rename", src);
return false;
}
throw ex;
}
} else { } else {
// Prepare for, execute and clean up after of all files in folder, and // Prepare for, execute and clean up after of all files in folder, and
@ -2290,10 +2545,24 @@ public class NativeAzureFileSystem extends FileSystem {
} }
@Override @Override
public void setPermission(Path p, FsPermission permission) throws IOException { public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
Path absolutePath = makeAbsolute(p); Path absolutePath = makeAbsolute(p);
String key = pathToKey(absolutePath); String key = pathToKey(absolutePath);
FileMetadata metadata = store.retrieveMetadata(key); FileMetadata metadata = null;
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
}
throw ex;
}
if (metadata == null) { if (metadata == null) {
throw new FileNotFoundException("File doesn't exist: " + p); throw new FileNotFoundException("File doesn't exist: " + p);
} }
@ -2317,10 +2586,26 @@ public class NativeAzureFileSystem extends FileSystem {
throws IOException { throws IOException {
Path absolutePath = makeAbsolute(p); Path absolutePath = makeAbsolute(p);
String key = pathToKey(absolutePath); String key = pathToKey(absolutePath);
FileMetadata metadata = store.retrieveMetadata(key); FileMetadata metadata = null;
try {
metadata = store.retrieveMetadata(key);
} catch (IOException ex) {
Throwable innerException = NativeAzureFileSystem.checkForAzureStorageException(ex);
if (innerException instanceof StorageException
&& NativeAzureFileSystem.isFileNotFoundException((StorageException) innerException)) {
throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
}
throw ex;
}
if (metadata == null) { if (metadata == null) {
throw new FileNotFoundException("File doesn't exist: " + p); throw new FileNotFoundException("File doesn't exist: " + p);
} }
PermissionStatus newPermissionStatus = new PermissionStatus( PermissionStatus newPermissionStatus = new PermissionStatus(
username == null ? username == null ?
metadata.getPermissionStatus().getUserName() : username, metadata.getPermissionStatus().getUserName() : username,
@ -2544,4 +2829,40 @@ public class NativeAzureFileSystem extends FileSystem {
} }
} }
} }
/*
* Helper method to recursively check if the cause of the exception is
* a Azure storage exception.
*/
private static Throwable checkForAzureStorageException(Exception e) {
Throwable innerException = e.getCause();
while (innerException != null
&& !(innerException instanceof StorageException)) {
innerException = innerException.getCause();
}
return innerException;
}
/*
* Helper method to check if the AzureStorageException is
* because backing blob was not found.
*/
private static boolean isFileNotFoundException(StorageException e) {
String errorCode = ((StorageException) e).getErrorCode();
if (errorCode != null
&& (errorCode.equals(StorageErrorCodeStrings.BLOB_NOT_FOUND)
|| errorCode.equals(StorageErrorCodeStrings.RESOURCE_NOT_FOUND)
|| errorCode.equals(StorageErrorCode.BLOB_NOT_FOUND.toString())
|| errorCode.equals(StorageErrorCode.RESOURCE_NOT_FOUND.toString()))) {
return true;
}
return false;
}
} }

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
public class ExceptionHandlingTestHelper {
/*
* Helper method to create a PageBlob test storage account.
*/
public static AzureBlobStorageTestAccount getPageBlobTestStorageAccount()
throws Exception {
Configuration conf = new Configuration();
// Configure the page blob directories key so every file created is a page blob.
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
// Configure the atomic rename directories key so every folder will have
// atomic rename applied.
conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
return AzureBlobStorageTestAccount.create(conf);
}
/*
* Helper method to create an empty file
*/
public static void createEmptyFile(AzureBlobStorageTestAccount testAccount, Path testPath) throws Exception {
FileSystem fs = testAccount.getFileSystem();
FSDataOutputStream inputStream = fs.create(testPath);
inputStream.close();
}
/*
* Helper method to create an folder and files inside it.
*/
public static void createTestFolder(AzureBlobStorageTestAccount testAccount, Path testFolderPath) throws Exception {
FileSystem fs = testAccount.getFileSystem();
fs.mkdirs(testFolderPath);
String testFolderFilePathBase = "test";
for (int i = 0; i < 10; i++) {
Path p = new Path(testFolderPath.toString() + "/" + testFolderFilePathBase + i + ".dat");
fs.create(p).close();
}
}
}

View File

@ -20,35 +20,25 @@ package org.apache.hadoop.fs.azure;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class TestFileSystemOperationExceptionHandling extends public class TestFileSystemOperationExceptionHandling extends
NativeAzureFileSystemBaseTest { NativeAzureFileSystemBaseTest {
FSDataInputStream inputStream = null; private FSDataInputStream inputStream = null;
/*
* Helper method to create a PageBlob test storage account.
*/
private AzureBlobStorageTestAccount getPageBlobTestStorageAccount()
throws Exception {
Configuration conf = new Configuration(); private static Path testPath = new Path("testfile.dat");
// Configure the page blob directories key so every file created is a page blob.
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
// Configure the atomic rename directories key so every folder will have
// atomic rename applied.
conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
return AzureBlobStorageTestAccount.create(conf);
}
private static Path testFolderPath = new Path("testfolder");
/* /*
* Helper method that creates a InputStream to validate exceptions * Helper method that creates a InputStream to validate exceptions
@ -57,7 +47,7 @@ public class TestFileSystemOperationExceptionHandling extends
private void setupInputStreamToTest(AzureBlobStorageTestAccount testAccount) private void setupInputStreamToTest(AzureBlobStorageTestAccount testAccount)
throws Exception { throws Exception {
fs = testAccount.getFileSystem(); FileSystem fs = testAccount.getFileSystem();
// Step 1: Create a file and write dummy data. // Step 1: Create a file and write dummy data.
Path testFilePath1 = new Path("test1.dat"); Path testFilePath1 = new Path("test1.dat");
@ -79,7 +69,7 @@ public class TestFileSystemOperationExceptionHandling extends
*/ */
@Test(expected=FileNotFoundException.class) @Test(expected=FileNotFoundException.class)
public void testSingleThreadedPageBlobReadScenario() throws Throwable { public void testSingleThreadedPageBlobReadScenario() throws Throwable {
AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount(); AzureBlobStorageTestAccount testAccount = ExceptionHandlingTestHelper.getPageBlobTestStorageAccount();
setupInputStreamToTest(testAccount); setupInputStreamToTest(testAccount);
byte[] readBuffer = new byte[512]; byte[] readBuffer = new byte[512];
inputStream.read(readBuffer); inputStream.read(readBuffer);
@ -90,7 +80,7 @@ public class TestFileSystemOperationExceptionHandling extends
*/ */
@Test(expected=FileNotFoundException.class) @Test(expected=FileNotFoundException.class)
public void testSingleThreadedPageBlobSeekScenario() throws Throwable { public void testSingleThreadedPageBlobSeekScenario() throws Throwable {
AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount(); AzureBlobStorageTestAccount testAccount = ExceptionHandlingTestHelper.getPageBlobTestStorageAccount();
setupInputStreamToTest(testAccount); setupInputStreamToTest(testAccount);
inputStream.seek(5); inputStream.seek(5);
} }
@ -117,11 +107,158 @@ public class TestFileSystemOperationExceptionHandling extends
inputStream.read(readBuffer); inputStream.read(readBuffer);
} }
@Test(expected=FileNotFoundException.class)
/*
* Tests basic single threaded setPermission scenario
*/
public void testSingleThreadedBlockBlobSetPermissionScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(createTestAccount(), testPath);
fs.delete(testPath, true);
fs.setPermission(testPath, new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
}
@Test(expected=FileNotFoundException.class)
/*
* Tests basic single threaded setPermission scenario
*/
public void testSingleThreadedPageBlobSetPermissionScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(ExceptionHandlingTestHelper.getPageBlobTestStorageAccount(),
testPath);
fs.delete(testPath, true);
fs.setOwner(testPath, "testowner", "testgroup");
}
@Test(expected=FileNotFoundException.class)
/*
* Tests basic single threaded setPermission scenario
*/
public void testSingleThreadedBlockBlobSetOwnerScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(createTestAccount(), testPath);
fs.delete(testPath, true);
fs.setOwner(testPath, "testowner", "testgroup");
}
@Test(expected=FileNotFoundException.class)
/*
* Tests basic single threaded setPermission scenario
*/
public void testSingleThreadedPageBlobSetOwnerScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(ExceptionHandlingTestHelper.getPageBlobTestStorageAccount(),
testPath);
fs.delete(testPath, true);
fs.setPermission(testPath, new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
}
@Test(expected=FileNotFoundException.class)
/*
* Test basic single threaded listStatus scenario
*/
public void testSingleThreadedBlockBlobListStatusScenario() throws Throwable {
ExceptionHandlingTestHelper.createTestFolder(createTestAccount(), testFolderPath);
fs.delete(testFolderPath, true);
fs.listStatus(testFolderPath);
}
@Test(expected=FileNotFoundException.class)
/*
* Test basica single threaded listStatus scenario
*/
public void testSingleThreadedPageBlobListStatusScenario() throws Throwable {
ExceptionHandlingTestHelper.createTestFolder(ExceptionHandlingTestHelper.getPageBlobTestStorageAccount(),
testFolderPath);
fs.delete(testFolderPath, true);
fs.listStatus(testFolderPath);
}
@Test
/*
* Test basic single threaded listStatus scenario
*/
public void testSingleThreadedBlockBlobRenameScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(createTestAccount(),
testPath);
Path dstPath = new Path("dstFile.dat");
fs.delete(testPath, true);
boolean renameResult = fs.rename(testPath, dstPath);
Assert.assertFalse(renameResult);
}
@Test
/*
* Test basic single threaded listStatus scenario
*/
public void testSingleThreadedPageBlobRenameScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(ExceptionHandlingTestHelper.getPageBlobTestStorageAccount(),
testPath);
Path dstPath = new Path("dstFile.dat");
fs.delete(testPath, true);
boolean renameResult = fs.rename(testPath, dstPath);
Assert.assertFalse(renameResult);
}
@Test
/*
* Test basic single threaded listStatus scenario
*/
public void testSingleThreadedBlockBlobDeleteScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(createTestAccount(),
testPath);
fs.delete(testPath, true);
boolean deleteResult = fs.delete(testPath, true);
Assert.assertFalse(deleteResult);
}
@Test
/*
* Test basic single threaded listStatus scenario
*/
public void testSingleThreadedPageBlobDeleteScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(ExceptionHandlingTestHelper.getPageBlobTestStorageAccount(),
testPath);
fs.delete(testPath, true);
boolean deleteResult = fs.delete(testPath, true);
Assert.assertFalse(deleteResult);
}
@Test(expected=FileNotFoundException.class)
/*
* Test basic single threaded listStatus scenario
*/
public void testSingleThreadedBlockBlobOpenScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(createTestAccount(),
testPath);
fs.delete(testPath, true);
inputStream = fs.open(testPath);
}
@Test(expected=FileNotFoundException.class)
/*
* Test basic single threaded listStatus scenario
*/
public void testSingleThreadedPageBlobOpenScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(ExceptionHandlingTestHelper.getPageBlobTestStorageAccount(),
testPath);
fs.delete(testPath, true);
inputStream = fs.open(testPath);
}
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
if (inputStream != null) { if (inputStream != null) {
inputStream.close(); inputStream.close();
} }
if (fs != null && fs.exists(testPath)) {
fs.delete(testPath, true);
}
} }
@Override @Override

View File

@ -20,11 +20,12 @@ package org.apache.hadoop.fs.azure;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -32,6 +33,11 @@ public class TestFileSystemOperationsExceptionHandlingMultiThreaded extends
NativeAzureFileSystemBaseTest { NativeAzureFileSystemBaseTest {
FSDataInputStream inputStream = null; FSDataInputStream inputStream = null;
private static Path testPath = new Path("testfile.dat");
private static Path testFolderPath = new Path("testfolder");
/* /*
* Helper method to creates an input stream to test various scenarios. * Helper method to creates an input stream to test various scenarios.
*/ */
@ -87,6 +93,135 @@ public class TestFileSystemOperationsExceptionHandlingMultiThreaded extends
inputStream.seek(5); inputStream.seek(5);
} }
@Test(expected=FileNotFoundException.class)
/*
* Tests basic multi threaded setPermission scenario
*/
public void testMultiThreadedPageBlobSetPermissionScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(ExceptionHandlingTestHelper.getPageBlobTestStorageAccount(),
testPath);
Thread t = new Thread(new DeleteThread(fs, testPath));
t.start();
while (t.isAlive()) {
fs.setPermission(testPath, new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
}
fs.setPermission(testPath, new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
}
@Test(expected=FileNotFoundException.class)
/*
* Tests basic multi threaded setPermission scenario
*/
public void testMultiThreadedBlockBlobSetPermissionScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(createTestAccount(),
testPath);
Thread t = new Thread(new DeleteThread(fs, testPath));
t.start();
while (t.isAlive()) {
fs.setPermission(testPath, new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
}
fs.setPermission(testPath, new FsPermission(FsAction.EXECUTE, FsAction.READ, FsAction.READ));
}
@Test(expected=FileNotFoundException.class)
/*
* Tests basic multi threaded setPermission scenario
*/
public void testMultiThreadedPageBlobOpenScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(createTestAccount(),
testPath);
Thread t = new Thread(new DeleteThread(fs, testPath));
t.start();
while (t.isAlive()) {
inputStream = fs.open(testPath);
inputStream.close();
}
inputStream = fs.open(testPath);
inputStream.close();
}
@Test(expected=FileNotFoundException.class)
/*
* Tests basic multi threaded setPermission scenario
*/
public void testMultiThreadedBlockBlobOpenScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(ExceptionHandlingTestHelper.getPageBlobTestStorageAccount(),
testPath);
Thread t = new Thread(new DeleteThread(fs, testPath));
t.start();
while (t.isAlive()) {
inputStream = fs.open(testPath);
inputStream.close();
}
inputStream = fs.open(testPath);
inputStream.close();
}
@Test(expected=FileNotFoundException.class)
/*
* Tests basic multi threaded setOwner scenario
*/
public void testMultiThreadedBlockBlobSetOwnerScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(createTestAccount(), testPath);
Thread t = new Thread(new DeleteThread(fs, testPath));
t.start();
while (t.isAlive()) {
fs.setOwner(testPath, "testowner", "testgroup");
}
fs.setOwner(testPath, "testowner", "testgroup");
}
@Test(expected=FileNotFoundException.class)
/*
* Tests basic multi threaded setOwner scenario
*/
public void testMultiThreadedPageBlobSetOwnerScenario() throws Throwable {
ExceptionHandlingTestHelper.createEmptyFile(ExceptionHandlingTestHelper.getPageBlobTestStorageAccount(),
testPath);
Thread t = new Thread(new DeleteThread(fs, testPath));
t.start();
while (t.isAlive()) {
fs.setOwner(testPath, "testowner", "testgroup");
}
fs.setOwner(testPath, "testowner", "testgroup");
}
@Test(expected=FileNotFoundException.class)
/*
* Tests basic multi threaded listStatus scenario
*/
public void testMultiThreadedBlockBlobListStatusScenario() throws Throwable {
ExceptionHandlingTestHelper.createTestFolder(createTestAccount(), testFolderPath);
Thread t = new Thread(new DeleteThread(fs, testFolderPath));
t.start();
while (t.isAlive()) {
fs.listStatus(testFolderPath);
}
fs.listStatus(testFolderPath);
}
@Test(expected=FileNotFoundException.class)
/*
* Tests basic multi threaded listStatus scenario
*/
public void testMultiThreadedPageBlobListStatusScenario() throws Throwable {
ExceptionHandlingTestHelper.createTestFolder(ExceptionHandlingTestHelper.getPageBlobTestStorageAccount(),
testFolderPath);
Thread t = new Thread(new DeleteThread(fs, testFolderPath));
t.start();
while (t.isAlive()) {
fs.listStatus(testFolderPath);
}
fs.listStatus(testFolderPath);
}
/* /*
* Test to validate correct exception is thrown for Multithreaded read * Test to validate correct exception is thrown for Multithreaded read
* scenario for page blobs * scenario for page blobs
@ -95,7 +230,7 @@ public class TestFileSystemOperationsExceptionHandlingMultiThreaded extends
@Test(expected=FileNotFoundException.class) @Test(expected=FileNotFoundException.class)
public void testMultiThreadedPageBlobReadScenario() throws Throwable { public void testMultiThreadedPageBlobReadScenario() throws Throwable {
AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount(); AzureBlobStorageTestAccount testAccount = ExceptionHandlingTestHelper.getPageBlobTestStorageAccount();
fs = testAccount.getFileSystem(); fs = testAccount.getFileSystem();
Path testFilePath1 = new Path("test1.dat"); Path testFilePath1 = new Path("test1.dat");
@ -116,7 +251,7 @@ public class TestFileSystemOperationsExceptionHandlingMultiThreaded extends
@Test(expected=FileNotFoundException.class) @Test(expected=FileNotFoundException.class)
public void testMultiThreadedPageBlobSeekScenario() throws Throwable { public void testMultiThreadedPageBlobSeekScenario() throws Throwable {
AzureBlobStorageTestAccount testAccount = getPageBlobTestStorageAccount(); AzureBlobStorageTestAccount testAccount = ExceptionHandlingTestHelper.getPageBlobTestStorageAccount();
fs = testAccount.getFileSystem(); fs = testAccount.getFileSystem();
Path testFilePath1 = new Path("test1.dat"); Path testFilePath1 = new Path("test1.dat");
@ -133,28 +268,16 @@ public class TestFileSystemOperationsExceptionHandlingMultiThreaded extends
return AzureBlobStorageTestAccount.create(); return AzureBlobStorageTestAccount.create();
} }
/*
* Helper method to create a PageBlob test storage account.
*/
private AzureBlobStorageTestAccount getPageBlobTestStorageAccount()
throws Exception {
Configuration conf = new Configuration();
// Configure the page blob directories key so every file created is a page blob.
conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
// Configure the atomic rename directories key so every folder will have
// atomic rename applied.
conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
return AzureBlobStorageTestAccount.create(conf);
}
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
if (inputStream != null) { if (inputStream != null) {
inputStream.close(); inputStream.close();
} }
if (fs != null && fs.exists(testPath)) {
fs.delete(testPath, true);
}
} }
} }
@ -183,3 +306,24 @@ class RenameThread implements Runnable {
} }
} }
} }
class DeleteThread implements Runnable {
private FileSystem fs;
private Path testPath;
public DeleteThread(FileSystem fs, Path testPath) {
this.fs = fs;
this.testPath = testPath;
}
@Override
public void run() {
try {
fs.delete(testPath, true);
} catch (Exception e) {
// Swallowing the exception as the
// correctness of the test is controlled
// by the other thread
}
}
}