diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotException.java index e9c5b2a4b19..49f3eaaba2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotException.java @@ -30,4 +30,8 @@ public class SnapshotException extends IOException { public SnapshotException(final Throwable cause) { super(cause); } + + public SnapshotException(final String message, final Throwable cause) { + super(message, cause); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index cc6201d6234..825549c9a28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -4779,7 +4779,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return blockId; } - private boolean isFileDeleted(INodeFile file) { + boolean isFileDeleted(INodeFile file) { // Not in the inodeMap or in the snapshot but marked deleted. if (dir.getInode(file.getId()) == null) { return true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index 3b673f418b6..51125a149cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -213,18 +213,25 @@ public class LeaseManager { * * @return Set */ - public Set getINodeWithLeases() { + @VisibleForTesting + Set getINodeWithLeases() throws IOException { return getINodeWithLeases(null); } private synchronized INode[] getINodesWithLease() { - int inodeCount = 0; - INode[] inodes = new INode[leasesById.size()]; + List inodes = new ArrayList<>(leasesById.size()); + INode currentINode; for (long inodeId : leasesById.keySet()) { - inodes[inodeCount] = fsnamesystem.getFSDirectory().getInode(inodeId); - inodeCount++; + currentINode = fsnamesystem.getFSDirectory().getInode(inodeId); + // A file with an active lease could get deleted, or its + // parent directories could get recursively deleted. + if (currentINode != null && + currentINode.isFile() && + !fsnamesystem.isFileDeleted(currentINode.asFile())) { + inodes.add(currentINode); + } } - return inodes; + return inodes.toArray(new INode[0]); } /** @@ -237,7 +244,7 @@ public class LeaseManager { * @return Set */ public Set getINodeWithLeases(final INodeDirectory - ancestorDir) { + ancestorDir) throws IOException { assert fsnamesystem.hasReadLock(); final long startTimeMs = Time.monotonicNow(); Set iipSet = new HashSet<>(); @@ -284,7 +291,7 @@ public class LeaseManager { try { iipSet.addAll(f.get()); } catch (Exception e) { - LOG.warn("INode filter task encountered exception: ", e); + throw new IOException("Failed to get files with active leases", e); } } final long endTimeMs = Time.monotonicNow(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java index 0ab928d04dc..23dcbe8c9a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectorySnapshottableFeature.java @@ -195,11 +195,17 @@ public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID); if (captureOpenFiles) { - Set openFilesIIP = - leaseManager.getINodeWithLeases(snapshotRoot); - for (INodesInPath openFileIIP : openFilesIIP) { - INodeFile openFile = openFileIIP.getLastINode().asFile(); - openFile.recordModification(openFileIIP.getLatestSnapshotId()); + try { + Set openFilesIIP = + leaseManager.getINodeWithLeases(snapshotRoot); + for (INodesInPath openFileIIP : openFilesIIP) { + INodeFile openFile = openFileIIP.getLastINode().asFile(); + openFile.recordModification(openFileIIP.getLatestSnapshotId()); + } + } catch (Exception e) { + throw new SnapshotException("Failed to add snapshot: Unable to " + + "capture all open files under the snapshot dir " + + snapshotRoot.getFullPathName() + " for snapshot '" + name + "'", e); } } return s; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java index 7aaadf85d0b..fb83a3e3ee1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestOpenFilesWithSnapshot.java @@ -496,6 +496,132 @@ public class TestOpenFilesWithSnapshot { flumeOutputStream.close(); } + /** + * Test snapshot capturing open files when an open file with active lease + * is deleted by the client. + */ + @Test (timeout = 120000) + public void testSnapshotsForOpenFilesAndDeletion() throws Exception { + // Construct the directory tree + final Path snapRootDir = new Path("/level_0_A"); + final String flumeFileName = "flume.log"; + final String hbaseFileName = "hbase.log"; + final String snap1Name = "snap_1"; + final String snap2Name = "snap_2"; + final String snap3Name = "snap_3"; + + // Create files and open streams + final Path flumeFile = new Path(snapRootDir, flumeFileName); + createFile(flumeFile); + final Path hbaseFile = new Path(snapRootDir, hbaseFileName); + createFile(hbaseFile); + FSDataOutputStream flumeOutputStream = fs.append(flumeFile); + FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile); + + // Create Snapshot S1 + final Path snap1Dir = SnapshotTestHelper.createSnapshot( + fs, snapRootDir, snap1Name); + final Path flumeS1Path = new Path(snap1Dir, flumeFileName); + final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen(); + final Path hbaseS1Path = new Path(snap1Dir, hbaseFileName); + final long hbaseFileLengthAfterS1 = fs.getFileStatus(hbaseFile).getLen(); + + // Verify if Snap S1 file length is same as the the current versions + Assert.assertEquals(flumeFileLengthAfterS1, + fs.getFileStatus(flumeS1Path).getLen()); + Assert.assertEquals(hbaseFileLengthAfterS1, + fs.getFileStatus(hbaseS1Path).getLen()); + + long flumeFileWrittenDataLength = flumeFileLengthAfterS1; + long hbaseFileWrittenDataLength = hbaseFileLengthAfterS1; + int newWriteLength = (int) (BLOCKSIZE * 1.5); + byte[] buf = new byte[newWriteLength]; + Random random = new Random(); + random.nextBytes(buf); + + // Write more data to files + flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf); + hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf); + + // Create Snapshot S2 + final Path snap2Dir = SnapshotTestHelper.createSnapshot( + fs, snapRootDir, snap2Name); + final Path flumeS2Path = new Path(snap2Dir, flumeFileName); + final Path hbaseS2Path = new Path(snap2Dir, hbaseFileName); + + // Verify current files length are same as all data written till now + final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen(); + Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2); + final long hbaseFileLengthAfterS2 = fs.getFileStatus(hbaseFile).getLen(); + Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS2); + + // Verify if Snap S2 file length is same as the current versions + Assert.assertEquals(flumeFileLengthAfterS2, + fs.getFileStatus(flumeS2Path).getLen()); + Assert.assertEquals(hbaseFileLengthAfterS2, + fs.getFileStatus(hbaseS2Path).getLen()); + + // Write more data to open files + writeToStream(flumeOutputStream, buf); + hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf); + + // Verify old snapshots have point-in-time/frozen file + // lengths even after the current versions have moved forward. + Assert.assertEquals(flumeFileLengthAfterS1, + fs.getFileStatus(flumeS1Path).getLen()); + Assert.assertEquals(flumeFileLengthAfterS2, + fs.getFileStatus(flumeS2Path).getLen()); + Assert.assertEquals(hbaseFileLengthAfterS1, + fs.getFileStatus(hbaseS1Path).getLen()); + Assert.assertEquals(hbaseFileLengthAfterS2, + fs.getFileStatus(hbaseS2Path).getLen()); + + // Delete flume current file. Snapshots should + // still have references to flume file. + boolean flumeFileDeleted = fs.delete(flumeFile, true); + Assert.assertTrue(flumeFileDeleted); + Assert.assertFalse(fs.exists(flumeFile)); + Assert.assertTrue(fs.exists(flumeS1Path)); + Assert.assertTrue(fs.exists(flumeS2Path)); + + SnapshotTestHelper.createSnapshot(fs, snapRootDir, "tmp_snap"); + fs.deleteSnapshot(snapRootDir, "tmp_snap"); + + // Delete snap_2. snap_1 still has reference to + // the flume file. + fs.deleteSnapshot(snapRootDir, snap2Name); + Assert.assertFalse(fs.exists(flumeS2Path)); + Assert.assertTrue(fs.exists(flumeS1Path)); + + // Delete snap_1. Now all traces of flume file + // is gone. + fs.deleteSnapshot(snapRootDir, snap1Name); + Assert.assertFalse(fs.exists(flumeS2Path)); + Assert.assertFalse(fs.exists(flumeS1Path)); + + // Create Snapshot S3 + final Path snap3Dir = SnapshotTestHelper.createSnapshot( + fs, snapRootDir, snap3Name); + final Path hbaseS3Path = new Path(snap3Dir, hbaseFileName); + + // Verify live files length is same as all data written till now + final long hbaseFileLengthAfterS3 = fs.getFileStatus(hbaseFile).getLen(); + Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS3); + + // Write more data to open files + hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf); + + // Verify old snapshots have point-in-time/frozen file + // lengths even after the flume open file is deleted and + // the hbase live file has moved forward. + Assert.assertEquals(hbaseFileLengthAfterS3, + fs.getFileStatus(hbaseS3Path).getLen()); + Assert.assertEquals(hbaseFileWrittenDataLength, + fs.getFileStatus(hbaseFile).getLen()); + + hbaseOutputStream.close(); + } + private void restartNameNode() throws Exception { cluster.triggerBlockReports(); NameNode nameNode = cluster.getNameNode();