HADOOP-12634. Change Lazy Rename Pending Operation Completion of WASB to address case of potential data loss due to partial copy. Contributed by Gaurav Kanade.
(cherry picked from commit978bbdfeb2
) (cherry picked from commit4753676a34
) Conflicts: hadoop-common-project/hadoop-common/CHANGES.txt
This commit is contained in:
parent
ae535ec93e
commit
506a517f9f
|
@ -904,6 +904,10 @@ Release 2.8.0 - UNRELEASED
|
||||||
HADOOP-12608. Fix exception message in WASB when connecting with anonymous
|
HADOOP-12608. Fix exception message in WASB when connecting with anonymous
|
||||||
credential. (Dushyanth via xyao)
|
credential. (Dushyanth via xyao)
|
||||||
|
|
||||||
|
HADOOP-12634. Change Lazy Rename Pending Operation Completion of WASB to
|
||||||
|
address case of potential data loss due to partial copy
|
||||||
|
(Gaurav Kanade via cnauroth)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -536,45 +536,13 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
Path dstFile = fullPath(dstKey, fileName);
|
Path dstFile = fullPath(dstKey, fileName);
|
||||||
boolean srcExists = fs.exists(srcFile);
|
boolean srcExists = fs.exists(srcFile);
|
||||||
boolean dstExists = fs.exists(dstFile);
|
boolean dstExists = fs.exists(dstFile);
|
||||||
if (srcExists && !dstExists) {
|
if(srcExists) {
|
||||||
|
|
||||||
// Rename gets exclusive access (via a lease) for HBase write-ahead log
|
// Rename gets exclusive access (via a lease) for HBase write-ahead log
|
||||||
// (WAL) file processing correctness. See the rename code for details.
|
// (WAL) file processing correctness. See the rename code for details.
|
||||||
String srcName = fs.pathToKey(srcFile);
|
String srcName = fs.pathToKey(srcFile);
|
||||||
String dstName = fs.pathToKey(dstFile);
|
String dstName = fs.pathToKey(dstFile);
|
||||||
fs.getStoreInterface().rename(srcName, dstName, true, null);
|
fs.getStoreInterface().rename(srcName, dstName, true, null);
|
||||||
} else if (srcExists && dstExists) {
|
|
||||||
|
|
||||||
// Get a lease on source to block write access.
|
|
||||||
String srcName = fs.pathToKey(srcFile);
|
|
||||||
SelfRenewingLease lease = null;
|
|
||||||
try {
|
|
||||||
lease = fs.acquireLease(srcFile);
|
|
||||||
// Delete the file. This will free the lease too.
|
|
||||||
fs.getStoreInterface().delete(srcName, lease);
|
|
||||||
} catch(AzureException e) {
|
|
||||||
String errorCode = "";
|
|
||||||
try {
|
|
||||||
StorageException e2 = (StorageException) e.getCause();
|
|
||||||
errorCode = e2.getErrorCode();
|
|
||||||
} catch(Exception e3) {
|
|
||||||
// do nothing if cast fails
|
|
||||||
}
|
|
||||||
// If the rename already finished do nothing
|
|
||||||
if(!errorCode.equals("BlobNotFound")){
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
if(lease != null){
|
|
||||||
lease.free();
|
|
||||||
}
|
|
||||||
} catch(StorageException e) {
|
|
||||||
LOG.warn("Unable to free lease because: " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (!srcExists && dstExists) {
|
} else if (!srcExists && dstExists) {
|
||||||
|
|
||||||
// The rename already finished, so do nothing.
|
// The rename already finished, so do nothing.
|
||||||
;
|
;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
|
||||||
|
@ -43,6 +45,26 @@ public class TestNativeAzureFileSystemLive extends
|
||||||
return AzureBlobStorageTestAccount.create();
|
return AzureBlobStorageTestAccount.create();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLazyRenamePendingCanOverwriteExistingFile()
|
||||||
|
throws Exception {
|
||||||
|
final String SRC_FILE_KEY = "srcFile";
|
||||||
|
final String DST_FILE_KEY = "dstFile";
|
||||||
|
Path srcPath = new Path(SRC_FILE_KEY);
|
||||||
|
FSDataOutputStream srcStream = fs.create(srcPath);
|
||||||
|
assertTrue(fs.exists(srcPath));
|
||||||
|
Path dstPath = new Path(DST_FILE_KEY);
|
||||||
|
FSDataOutputStream dstStream = fs.create(dstPath);
|
||||||
|
assertTrue(fs.exists(dstPath));
|
||||||
|
NativeAzureFileSystem nfs = (NativeAzureFileSystem)fs;
|
||||||
|
final String fullSrcKey = nfs.pathToKey(nfs.makeAbsolute(srcPath));
|
||||||
|
final String fullDstKey = nfs.pathToKey(nfs.makeAbsolute(dstPath));
|
||||||
|
nfs.getStoreInterface().rename(fullSrcKey, fullDstKey, true, null);
|
||||||
|
assertTrue(fs.exists(dstPath));
|
||||||
|
assertFalse(fs.exists(srcPath));
|
||||||
|
IOUtils.cleanup(null, srcStream);
|
||||||
|
IOUtils.cleanup(null, dstStream);
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Tests fs.delete() function to delete a blob when another blob is holding a
|
* Tests fs.delete() function to delete a blob when another blob is holding a
|
||||||
* lease on it. Delete if called without a lease should fail if another process
|
* lease on it. Delete if called without a lease should fail if another process
|
||||||
|
|
Loading…
Reference in New Issue