HADOOP-12634. Change Lazy Rename Pending Operation Completion of WASB to address case of potential data loss due to partial copy. Contributed by Gaurav Kanade.
(cherry picked from commit 978bbdfeb2d12efd6e750da6a14849e072fb814b)
This commit is contained in:
parent
2e6990cdf2
commit
4753676a34
@ -926,6 +926,10 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HADOOP-12689. S3 filesystem operations stopped working correctly
|
HADOOP-12689. S3 filesystem operations stopped working correctly
|
||||||
(Matt Paduano via raviprak)
|
(Matt Paduano via raviprak)
|
||||||
|
|
||||||
|
HADOOP-12634. Change Lazy Rename Pending Operation Completion of WASB to
|
||||||
|
address case of potential data loss due to partial copy
|
||||||
|
(Gaurav Kanade via cnauroth)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -536,45 +536,13 @@ private void finishSingleFileRename(String fileName)
|
|||||||
Path dstFile = fullPath(dstKey, fileName);
|
Path dstFile = fullPath(dstKey, fileName);
|
||||||
boolean srcExists = fs.exists(srcFile);
|
boolean srcExists = fs.exists(srcFile);
|
||||||
boolean dstExists = fs.exists(dstFile);
|
boolean dstExists = fs.exists(dstFile);
|
||||||
if (srcExists && !dstExists) {
|
if(srcExists) {
|
||||||
|
|
||||||
// Rename gets exclusive access (via a lease) for HBase write-ahead log
|
// Rename gets exclusive access (via a lease) for HBase write-ahead log
|
||||||
// (WAL) file processing correctness. See the rename code for details.
|
// (WAL) file processing correctness. See the rename code for details.
|
||||||
String srcName = fs.pathToKey(srcFile);
|
String srcName = fs.pathToKey(srcFile);
|
||||||
String dstName = fs.pathToKey(dstFile);
|
String dstName = fs.pathToKey(dstFile);
|
||||||
fs.getStoreInterface().rename(srcName, dstName, true, null);
|
fs.getStoreInterface().rename(srcName, dstName, true, null);
|
||||||
} else if (srcExists && dstExists) {
|
|
||||||
|
|
||||||
// Get a lease on source to block write access.
|
|
||||||
String srcName = fs.pathToKey(srcFile);
|
|
||||||
SelfRenewingLease lease = null;
|
|
||||||
try {
|
|
||||||
lease = fs.acquireLease(srcFile);
|
|
||||||
// Delete the file. This will free the lease too.
|
|
||||||
fs.getStoreInterface().delete(srcName, lease);
|
|
||||||
} catch(AzureException e) {
|
|
||||||
String errorCode = "";
|
|
||||||
try {
|
|
||||||
StorageException e2 = (StorageException) e.getCause();
|
|
||||||
errorCode = e2.getErrorCode();
|
|
||||||
} catch(Exception e3) {
|
|
||||||
// do nothing if cast fails
|
|
||||||
}
|
|
||||||
// If the rename already finished do nothing
|
|
||||||
if(!errorCode.equals("BlobNotFound")){
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
if(lease != null){
|
|
||||||
lease.free();
|
|
||||||
}
|
|
||||||
} catch(StorageException e) {
|
|
||||||
LOG.warn("Unable to free lease because: " + e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (!srcExists && dstExists) {
|
} else if (!srcExists && dstExists) {
|
||||||
|
|
||||||
// The rename already finished, so do nothing.
|
// The rename already finished, so do nothing.
|
||||||
;
|
;
|
||||||
} else {
|
} else {
|
||||||
|
@ -24,6 +24,8 @@
|
|||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
|
||||||
@ -43,6 +45,26 @@ protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
|||||||
return AzureBlobStorageTestAccount.create();
|
return AzureBlobStorageTestAccount.create();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLazyRenamePendingCanOverwriteExistingFile()
|
||||||
|
throws Exception {
|
||||||
|
final String SRC_FILE_KEY = "srcFile";
|
||||||
|
final String DST_FILE_KEY = "dstFile";
|
||||||
|
Path srcPath = new Path(SRC_FILE_KEY);
|
||||||
|
FSDataOutputStream srcStream = fs.create(srcPath);
|
||||||
|
assertTrue(fs.exists(srcPath));
|
||||||
|
Path dstPath = new Path(DST_FILE_KEY);
|
||||||
|
FSDataOutputStream dstStream = fs.create(dstPath);
|
||||||
|
assertTrue(fs.exists(dstPath));
|
||||||
|
NativeAzureFileSystem nfs = (NativeAzureFileSystem)fs;
|
||||||
|
final String fullSrcKey = nfs.pathToKey(nfs.makeAbsolute(srcPath));
|
||||||
|
final String fullDstKey = nfs.pathToKey(nfs.makeAbsolute(dstPath));
|
||||||
|
nfs.getStoreInterface().rename(fullSrcKey, fullDstKey, true, null);
|
||||||
|
assertTrue(fs.exists(dstPath));
|
||||||
|
assertFalse(fs.exists(srcPath));
|
||||||
|
IOUtils.cleanup(null, srcStream);
|
||||||
|
IOUtils.cleanup(null, dstStream);
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Tests fs.delete() function to delete a blob when another blob is holding a
|
* Tests fs.delete() function to delete a blob when another blob is holding a
|
||||||
* lease on it. Delete if called without a lease should fail if another process
|
* lease on it. Delete if called without a lease should fail if another process
|
||||||
|
Loading…
x
Reference in New Issue
Block a user