HADOOP-13675. Bug in return value for delete() calls in WASB. Contributed by Dushyanth
(cherry picked from commit 15dd1f3381
)
This commit is contained in:
parent
0f6fbfc0db
commit
89eaa94c59
|
@ -2045,10 +2045,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
* The key to search for.
|
* The key to search for.
|
||||||
* @return The wanted directory, or null if not found.
|
* @return The wanted directory, or null if not found.
|
||||||
*/
|
*/
|
||||||
private static FileMetadata getDirectoryInList(
|
private static FileMetadata getFileMetadataInList(
|
||||||
final Iterable<FileMetadata> list, String key) {
|
final Iterable<FileMetadata> list, String key) {
|
||||||
for (FileMetadata current : list) {
|
for (FileMetadata current : list) {
|
||||||
if (current.isDir() && current.getKey().equals(key)) {
|
if (current.getKey().equals(key)) {
|
||||||
return current;
|
return current;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2114,7 +2114,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
|
|
||||||
// Add the metadata to the list, but remove any existing duplicate
|
// Add the metadata to the list, but remove any existing duplicate
|
||||||
// entries first that we may have added by finding nested files.
|
// entries first that we may have added by finding nested files.
|
||||||
FileMetadata existing = getDirectoryInList(fileMetadata, blobKey);
|
FileMetadata existing = getFileMetadataInList(fileMetadata, blobKey);
|
||||||
if (existing != null) {
|
if (existing != null) {
|
||||||
fileMetadata.remove(existing);
|
fileMetadata.remove(existing);
|
||||||
}
|
}
|
||||||
|
@ -2141,7 +2141,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
|
|
||||||
// Add the directory metadata to the list only if it's not already
|
// Add the directory metadata to the list only if it's not already
|
||||||
// there.
|
// there.
|
||||||
if (getDirectoryInList(fileMetadata, dirKey) == null) {
|
if (getFileMetadataInList(fileMetadata, dirKey) == null) {
|
||||||
fileMetadata.add(directoryMetadata);
|
fileMetadata.add(directoryMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2249,7 +2249,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
|
|
||||||
// Add the directory metadata to the list only if it's not already
|
// Add the directory metadata to the list only if it's not already
|
||||||
// there.
|
// there.
|
||||||
FileMetadata existing = getDirectoryInList(aFileMetadataList, blobKey);
|
FileMetadata existing = getFileMetadataInList(aFileMetadataList, blobKey);
|
||||||
if (existing != null) {
|
if (existing != null) {
|
||||||
aFileMetadataList.remove(existing);
|
aFileMetadataList.remove(existing);
|
||||||
}
|
}
|
||||||
|
@ -2278,7 +2278,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
// absolute path is being used or not.
|
// absolute path is being used or not.
|
||||||
String dirKey = normalizeKey(directory);
|
String dirKey = normalizeKey(directory);
|
||||||
|
|
||||||
if (getDirectoryInList(aFileMetadataList, dirKey) == null) {
|
if (getFileMetadataInList(aFileMetadataList, dirKey) == null) {
|
||||||
// Reached the targeted listing depth. Return metadata for the
|
// Reached the targeted listing depth. Return metadata for the
|
||||||
// directory using default permissions.
|
// directory using default permissions.
|
||||||
//
|
//
|
||||||
|
@ -2376,18 +2376,24 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* API implementation to delete a blob in the back end azure storage.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void delete(String key, SelfRenewingLease lease) throws IOException {
|
public boolean delete(String key, SelfRenewingLease lease) throws IOException {
|
||||||
try {
|
try {
|
||||||
if (checkContainer(ContainerAccessType.ReadThenWrite) == ContainerState.DoesntExist) {
|
if (checkContainer(ContainerAccessType.ReadThenWrite) == ContainerState.DoesntExist) {
|
||||||
// Container doesn't exist, no need to do anything
|
// Container doesn't exist, no need to do anything
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the blob reference and delete it.
|
// Get the blob reference and delete it.
|
||||||
CloudBlobWrapper blob = getBlobReference(key);
|
CloudBlobWrapper blob = getBlobReference(key);
|
||||||
if (blob.exists(getInstrumentedContext())) {
|
if (blob.exists(getInstrumentedContext())) {
|
||||||
safeDelete(blob, lease);
|
safeDelete(blob, lease);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Re-throw as an Azure storage exception.
|
// Re-throw as an Azure storage exception.
|
||||||
|
@ -2395,10 +2401,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* API implementation to delete a blob in the back end azure storage.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void delete(String key) throws IOException {
|
public boolean delete(String key) throws IOException {
|
||||||
try {
|
try {
|
||||||
delete(key, null);
|
return delete(key, null);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
Throwable t = e.getCause();
|
Throwable t = e.getCause();
|
||||||
if(t != null && t instanceof StorageException) {
|
if(t != null && t instanceof StorageException) {
|
||||||
|
@ -2407,7 +2416,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
SelfRenewingLease lease = null;
|
SelfRenewingLease lease = null;
|
||||||
try {
|
try {
|
||||||
lease = acquireLease(key);
|
lease = acquireLease(key);
|
||||||
delete(key, lease);
|
return delete(key, lease);
|
||||||
} catch (AzureException e3) {
|
} catch (AzureException e3) {
|
||||||
LOG.warn("Got unexpected exception trying to acquire lease on "
|
LOG.warn("Got unexpected exception trying to acquire lease on "
|
||||||
+ key + "." + e3.getMessage());
|
+ key + "." + e3.getMessage());
|
||||||
|
|
|
@ -1765,8 +1765,11 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
store.delete(key);
|
if (store.delete(key)) {
|
||||||
instrumentation.fileDeleted();
|
instrumentation.fileDeleted();
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
|
|
||||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||||
|
@ -1885,7 +1888,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete the current directory
|
// Delete the current directory
|
||||||
if (!deleteFile(metaFile.getKey(), metaFile.isDir())) {
|
if (store.retrieveMetadata(metaFile.getKey()) != null && !deleteFile(metaFile.getKey(), metaFile.isDir())) {
|
||||||
LOG.error("Failed delete directory {}", f.toString());
|
LOG.error("Failed delete directory {}", f.toString());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1913,11 +1916,15 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
boolean deleteFile(String key, boolean isDir) throws IOException {
|
boolean deleteFile(String key, boolean isDir) throws IOException {
|
||||||
try {
|
try {
|
||||||
store.delete(key);
|
if (store.delete(key)) {
|
||||||
if (isDir) {
|
if (isDir) {
|
||||||
instrumentation.directoryDeleted();
|
instrumentation.directoryDeleted();
|
||||||
|
} else {
|
||||||
|
instrumentation.fileDeleted();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
instrumentation.fileDeleted();
|
return false;
|
||||||
}
|
}
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||||
|
@ -1929,8 +1936,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
|
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2790,6 +2795,8 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
LOG.debug("Deleting dangling file {}", file.getKey());
|
LOG.debug("Deleting dangling file {}", file.getKey());
|
||||||
|
// Not handling delete return type as false return essentially
|
||||||
|
// means its a no-op for the caller
|
||||||
store.delete(file.getKey());
|
store.delete(file.getKey());
|
||||||
store.delete(tempFile.getKey());
|
store.delete(tempFile.getKey());
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,16 @@ interface NativeFileSystemStore {
|
||||||
void changePermissionStatus(String key, PermissionStatus newPermission)
|
void changePermissionStatus(String key, PermissionStatus newPermission)
|
||||||
throws AzureException;
|
throws AzureException;
|
||||||
|
|
||||||
void delete(String key) throws IOException;
|
/**
|
||||||
|
* API to delete a blob in the back end azure storage.
|
||||||
|
* @param key - key to the blob being deleted.
|
||||||
|
* @return return true when delete is successful, false if
|
||||||
|
* blob cannot be found or delete is not possible without
|
||||||
|
* exception.
|
||||||
|
* @throws IOException Exception encountered while deleting in
|
||||||
|
* azure storage.
|
||||||
|
*/
|
||||||
|
boolean delete(String key) throws IOException;
|
||||||
|
|
||||||
void rename(String srcKey, String dstKey) throws IOException;
|
void rename(String srcKey, String dstKey) throws IOException;
|
||||||
|
|
||||||
|
@ -104,7 +113,17 @@ interface NativeFileSystemStore {
|
||||||
void updateFolderLastModifiedTime(String key, Date lastModified,
|
void updateFolderLastModifiedTime(String key, Date lastModified,
|
||||||
SelfRenewingLease folderLease) throws AzureException;
|
SelfRenewingLease folderLease) throws AzureException;
|
||||||
|
|
||||||
void delete(String key, SelfRenewingLease lease) throws IOException;
|
/**
|
||||||
|
* API to delete a blob in the back end azure storage.
|
||||||
|
* @param key - key to the blob being deleted.
|
||||||
|
* @param lease - Active lease on the blob.
|
||||||
|
* @return return true when delete is successful, false if
|
||||||
|
* blob cannot be found or delete is not possible without
|
||||||
|
* exception.
|
||||||
|
* @throws IOException Exception encountered while deleting in
|
||||||
|
* azure storage.
|
||||||
|
*/
|
||||||
|
boolean delete(String key, SelfRenewingLease lease) throws IOException;
|
||||||
|
|
||||||
SelfRenewingLease acquireLease(String key) throws AzureException;
|
SelfRenewingLease acquireLease(String key) throws AzureException;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,119 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azure;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Test class to hold all Live Azure storage concurrency tests.
|
||||||
|
*/
|
||||||
|
public class TestNativeAzureFileSystemConcurrencyLive
|
||||||
|
extends AbstractWasbTestBase {
|
||||||
|
|
||||||
|
private static final int TEST_COUNT = 102;
|
||||||
|
@Override
|
||||||
|
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||||
|
return AzureBlobStorageTestAccount.create();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test multi-threaded deletes in WASB. Expected behavior is one of the thread
|
||||||
|
* should be to successfully delete the file and return true and all other
|
||||||
|
* threads need to return false.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMultiThreadedDeletes() throws Exception {
|
||||||
|
Path testFile = new Path("test.dat");
|
||||||
|
fs.create(testFile).close();
|
||||||
|
|
||||||
|
int threadCount = TEST_COUNT;
|
||||||
|
DeleteHelperThread[] helperThreads = new DeleteHelperThread[threadCount];
|
||||||
|
|
||||||
|
for (int i = 0; i < threadCount; i++) {
|
||||||
|
helperThreads[i] = new DeleteHelperThread(fs, testFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread[] threads = new Thread[threadCount];
|
||||||
|
|
||||||
|
for (int i = 0; i < threadCount; i++) {
|
||||||
|
threads[i] = new Thread(helperThreads[i]);
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < threadCount; i++) {
|
||||||
|
threads[i].join();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean deleteSuccess = false;
|
||||||
|
|
||||||
|
for (int i = 0; i < threadCount; i++) {
|
||||||
|
|
||||||
|
Assert.assertFalse("child thread has exception : " + helperThreads[i].getException(),
|
||||||
|
helperThreads[i].getExceptionEncounteredFlag());
|
||||||
|
|
||||||
|
if (deleteSuccess) {
|
||||||
|
Assert.assertFalse("More than one thread delete() retuhelperThreads[i].getDeleteSuccess()",
|
||||||
|
helperThreads[i].getExceptionEncounteredFlag());
|
||||||
|
} else {
|
||||||
|
deleteSuccess = helperThreads[i].getDeleteSuccess();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue("No successfull delete found", deleteSuccess);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class DeleteHelperThread implements Runnable {
|
||||||
|
|
||||||
|
private FileSystem fs;
|
||||||
|
private Path p;
|
||||||
|
private boolean deleteSuccess;
|
||||||
|
private boolean exceptionEncountered;
|
||||||
|
private Exception ex;
|
||||||
|
|
||||||
|
public DeleteHelperThread(FileSystem fs, Path p) {
|
||||||
|
this.fs = fs;
|
||||||
|
this.p = p;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
deleteSuccess = fs.delete(p, false);
|
||||||
|
} catch (Exception ioEx) {
|
||||||
|
exceptionEncountered = true;
|
||||||
|
this.ex = ioEx;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getDeleteSuccess() {
|
||||||
|
return deleteSuccess;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getExceptionEncounteredFlag() {
|
||||||
|
return exceptionEncountered;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Exception getException() {
|
||||||
|
return ex;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue