HDFS-12217. HDFS snapshots doesn't capture all open files when one of the open files is deleted.

(cherry picked from commit 52d7bafcf4)
This commit is contained in:
Manoj Govindassamy 2017-08-01 16:28:20 -07:00
parent 70c511598f
commit b0330c15da
5 changed files with 157 additions and 14 deletions

View File

@ -30,4 +30,8 @@ public class SnapshotException extends IOException {
public SnapshotException(final Throwable cause) {
super(cause);
}
public SnapshotException(final String message, final Throwable cause) {
super(message, cause);
}
}

View File

@ -4779,7 +4779,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return blockId;
}
private boolean isFileDeleted(INodeFile file) {
boolean isFileDeleted(INodeFile file) {
// Not in the inodeMap or in the snapshot but marked deleted.
if (dir.getInode(file.getId()) == null) {
return true;

View File

@ -213,18 +213,25 @@ public class LeaseManager {
*
* @return Set<INodesInPath>
*/
public Set<INodesInPath> getINodeWithLeases() {
@VisibleForTesting
Set<INodesInPath> getINodeWithLeases() throws IOException {
return getINodeWithLeases(null);
}
private synchronized INode[] getINodesWithLease() {
int inodeCount = 0;
INode[] inodes = new INode[leasesById.size()];
List<INode> inodes = new ArrayList<>(leasesById.size());
INode currentINode;
for (long inodeId : leasesById.keySet()) {
inodes[inodeCount] = fsnamesystem.getFSDirectory().getInode(inodeId);
inodeCount++;
currentINode = fsnamesystem.getFSDirectory().getInode(inodeId);
// A file with an active lease could get deleted, or its
// parent directories could get recursively deleted.
if (currentINode != null &&
currentINode.isFile() &&
!fsnamesystem.isFileDeleted(currentINode.asFile())) {
inodes.add(currentINode);
}
}
return inodes;
return inodes.toArray(new INode[0]);
}
/**
@ -237,7 +244,7 @@ public class LeaseManager {
* @return Set<INodesInPath>
*/
public Set<INodesInPath> getINodeWithLeases(final INodeDirectory
ancestorDir) {
ancestorDir) throws IOException {
assert fsnamesystem.hasReadLock();
final long startTimeMs = Time.monotonicNow();
Set<INodesInPath> iipSet = new HashSet<>();
@ -284,7 +291,7 @@ public class LeaseManager {
try {
iipSet.addAll(f.get());
} catch (Exception e) {
LOG.warn("INode filter task encountered exception: ", e);
throw new IOException("Failed to get files with active leases", e);
}
}
final long endTimeMs = Time.monotonicNow();

View File

@ -195,11 +195,17 @@ public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature
s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
if (captureOpenFiles) {
Set<INodesInPath> openFilesIIP =
leaseManager.getINodeWithLeases(snapshotRoot);
for (INodesInPath openFileIIP : openFilesIIP) {
INodeFile openFile = openFileIIP.getLastINode().asFile();
openFile.recordModification(openFileIIP.getLatestSnapshotId());
try {
Set<INodesInPath> openFilesIIP =
leaseManager.getINodeWithLeases(snapshotRoot);
for (INodesInPath openFileIIP : openFilesIIP) {
INodeFile openFile = openFileIIP.getLastINode().asFile();
openFile.recordModification(openFileIIP.getLatestSnapshotId());
}
} catch (Exception e) {
throw new SnapshotException("Failed to add snapshot: Unable to " +
"capture all open files under the snapshot dir " +
snapshotRoot.getFullPathName() + " for snapshot '" + name + "'", e);
}
}
return s;

View File

@ -496,6 +496,132 @@ public class TestOpenFilesWithSnapshot {
flumeOutputStream.close();
}
/**
* Test snapshot capturing open files when an open file with active lease
* is deleted by the client.
*/
@Test (timeout = 120000)
public void testSnapshotsForOpenFilesAndDeletion() throws Exception {
// Construct the directory tree
final Path snapRootDir = new Path("/level_0_A");
final String flumeFileName = "flume.log";
final String hbaseFileName = "hbase.log";
final String snap1Name = "snap_1";
final String snap2Name = "snap_2";
final String snap3Name = "snap_3";
// Create files and open streams
final Path flumeFile = new Path(snapRootDir, flumeFileName);
createFile(flumeFile);
final Path hbaseFile = new Path(snapRootDir, hbaseFileName);
createFile(hbaseFile);
FSDataOutputStream flumeOutputStream = fs.append(flumeFile);
FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile);
// Create Snapshot S1
final Path snap1Dir = SnapshotTestHelper.createSnapshot(
fs, snapRootDir, snap1Name);
final Path flumeS1Path = new Path(snap1Dir, flumeFileName);
final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen();
final Path hbaseS1Path = new Path(snap1Dir, hbaseFileName);
final long hbaseFileLengthAfterS1 = fs.getFileStatus(hbaseFile).getLen();
// Verify if Snap S1 file length is same as the the current versions
Assert.assertEquals(flumeFileLengthAfterS1,
fs.getFileStatus(flumeS1Path).getLen());
Assert.assertEquals(hbaseFileLengthAfterS1,
fs.getFileStatus(hbaseS1Path).getLen());
long flumeFileWrittenDataLength = flumeFileLengthAfterS1;
long hbaseFileWrittenDataLength = hbaseFileLengthAfterS1;
int newWriteLength = (int) (BLOCKSIZE * 1.5);
byte[] buf = new byte[newWriteLength];
Random random = new Random();
random.nextBytes(buf);
// Write more data to files
flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
// Create Snapshot S2
final Path snap2Dir = SnapshotTestHelper.createSnapshot(
fs, snapRootDir, snap2Name);
final Path flumeS2Path = new Path(snap2Dir, flumeFileName);
final Path hbaseS2Path = new Path(snap2Dir, hbaseFileName);
// Verify current files length are same as all data written till now
final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen();
Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2);
final long hbaseFileLengthAfterS2 = fs.getFileStatus(hbaseFile).getLen();
Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS2);
// Verify if Snap S2 file length is same as the current versions
Assert.assertEquals(flumeFileLengthAfterS2,
fs.getFileStatus(flumeS2Path).getLen());
Assert.assertEquals(hbaseFileLengthAfterS2,
fs.getFileStatus(hbaseS2Path).getLen());
// Write more data to open files
writeToStream(flumeOutputStream, buf);
hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
// Verify old snapshots have point-in-time/frozen file
// lengths even after the current versions have moved forward.
Assert.assertEquals(flumeFileLengthAfterS1,
fs.getFileStatus(flumeS1Path).getLen());
Assert.assertEquals(flumeFileLengthAfterS2,
fs.getFileStatus(flumeS2Path).getLen());
Assert.assertEquals(hbaseFileLengthAfterS1,
fs.getFileStatus(hbaseS1Path).getLen());
Assert.assertEquals(hbaseFileLengthAfterS2,
fs.getFileStatus(hbaseS2Path).getLen());
// Delete flume current file. Snapshots should
// still have references to flume file.
boolean flumeFileDeleted = fs.delete(flumeFile, true);
Assert.assertTrue(flumeFileDeleted);
Assert.assertFalse(fs.exists(flumeFile));
Assert.assertTrue(fs.exists(flumeS1Path));
Assert.assertTrue(fs.exists(flumeS2Path));
SnapshotTestHelper.createSnapshot(fs, snapRootDir, "tmp_snap");
fs.deleteSnapshot(snapRootDir, "tmp_snap");
// Delete snap_2. snap_1 still has reference to
// the flume file.
fs.deleteSnapshot(snapRootDir, snap2Name);
Assert.assertFalse(fs.exists(flumeS2Path));
Assert.assertTrue(fs.exists(flumeS1Path));
// Delete snap_1. Now all traces of flume file
// is gone.
fs.deleteSnapshot(snapRootDir, snap1Name);
Assert.assertFalse(fs.exists(flumeS2Path));
Assert.assertFalse(fs.exists(flumeS1Path));
// Create Snapshot S3
final Path snap3Dir = SnapshotTestHelper.createSnapshot(
fs, snapRootDir, snap3Name);
final Path hbaseS3Path = new Path(snap3Dir, hbaseFileName);
// Verify live files length is same as all data written till now
final long hbaseFileLengthAfterS3 = fs.getFileStatus(hbaseFile).getLen();
Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS3);
// Write more data to open files
hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
// Verify old snapshots have point-in-time/frozen file
// lengths even after the flume open file is deleted and
// the hbase live file has moved forward.
Assert.assertEquals(hbaseFileLengthAfterS3,
fs.getFileStatus(hbaseS3Path).getLen());
Assert.assertEquals(hbaseFileWrittenDataLength,
fs.getFileStatus(hbaseFile).getLen());
hbaseOutputStream.close();
}
private void restartNameNode() throws Exception {
cluster.triggerBlockReports();
NameNode nameNode = cluster.getNameNode();