HDFS-12217. HDFS snapshots doesn't capture all open files when one of the open files is deleted.
This commit is contained in:
parent
02cd71ba9d
commit
52d7bafcf4
|
@ -30,4 +30,8 @@ public class SnapshotException extends IOException {
|
|||
public SnapshotException(final Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public SnapshotException(final String message, final Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4971,7 +4971,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return blockId;
|
||||
}
|
||||
|
||||
private boolean isFileDeleted(INodeFile file) {
|
||||
boolean isFileDeleted(INodeFile file) {
|
||||
// Not in the inodeMap or in the snapshot but marked deleted.
|
||||
if (dir.getInode(file.getId()) == null) {
|
||||
return true;
|
||||
|
|
|
@ -162,18 +162,25 @@ public class LeaseManager {
|
|||
*
|
||||
* @return Set<INodesInPath>
|
||||
*/
|
||||
public Set<INodesInPath> getINodeWithLeases() {
|
||||
@VisibleForTesting
|
||||
Set<INodesInPath> getINodeWithLeases() throws IOException {
|
||||
return getINodeWithLeases(null);
|
||||
}
|
||||
|
||||
private synchronized INode[] getINodesWithLease() {
|
||||
int inodeCount = 0;
|
||||
INode[] inodes = new INode[leasesById.size()];
|
||||
List<INode> inodes = new ArrayList<>(leasesById.size());
|
||||
INode currentINode;
|
||||
for (long inodeId : leasesById.keySet()) {
|
||||
inodes[inodeCount] = fsnamesystem.getFSDirectory().getInode(inodeId);
|
||||
inodeCount++;
|
||||
currentINode = fsnamesystem.getFSDirectory().getInode(inodeId);
|
||||
// A file with an active lease could get deleted, or its
|
||||
// parent directories could get recursively deleted.
|
||||
if (currentINode != null &&
|
||||
currentINode.isFile() &&
|
||||
!fsnamesystem.isFileDeleted(currentINode.asFile())) {
|
||||
inodes.add(currentINode);
|
||||
}
|
||||
}
|
||||
return inodes;
|
||||
return inodes.toArray(new INode[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -186,7 +193,7 @@ public class LeaseManager {
|
|||
* @return Set<INodesInPath>
|
||||
*/
|
||||
public Set<INodesInPath> getINodeWithLeases(final INodeDirectory
|
||||
ancestorDir) {
|
||||
ancestorDir) throws IOException {
|
||||
assert fsnamesystem.hasReadLock();
|
||||
final long startTimeMs = Time.monotonicNow();
|
||||
Set<INodesInPath> iipSet = new HashSet<>();
|
||||
|
@ -233,7 +240,7 @@ public class LeaseManager {
|
|||
try {
|
||||
iipSet.addAll(f.get());
|
||||
} catch (Exception e) {
|
||||
LOG.warn("INode filter task encountered exception: ", e);
|
||||
throw new IOException("Failed to get files with active leases", e);
|
||||
}
|
||||
}
|
||||
final long endTimeMs = Time.monotonicNow();
|
||||
|
|
|
@ -195,11 +195,17 @@ public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature
|
|||
s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
|
||||
|
||||
if (captureOpenFiles) {
|
||||
Set<INodesInPath> openFilesIIP =
|
||||
leaseManager.getINodeWithLeases(snapshotRoot);
|
||||
for (INodesInPath openFileIIP : openFilesIIP) {
|
||||
INodeFile openFile = openFileIIP.getLastINode().asFile();
|
||||
openFile.recordModification(openFileIIP.getLatestSnapshotId());
|
||||
try {
|
||||
Set<INodesInPath> openFilesIIP =
|
||||
leaseManager.getINodeWithLeases(snapshotRoot);
|
||||
for (INodesInPath openFileIIP : openFilesIIP) {
|
||||
INodeFile openFile = openFileIIP.getLastINode().asFile();
|
||||
openFile.recordModification(openFileIIP.getLatestSnapshotId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new SnapshotException("Failed to add snapshot: Unable to " +
|
||||
"capture all open files under the snapshot dir " +
|
||||
snapshotRoot.getFullPathName() + " for snapshot '" + name + "'", e);
|
||||
}
|
||||
}
|
||||
return s;
|
||||
|
|
|
@ -496,6 +496,132 @@ public class TestOpenFilesWithSnapshot {
|
|||
flumeOutputStream.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test snapshot capturing open files when an open file with active lease
|
||||
* is deleted by the client.
|
||||
*/
|
||||
@Test (timeout = 120000)
|
||||
public void testSnapshotsForOpenFilesAndDeletion() throws Exception {
|
||||
// Construct the directory tree
|
||||
final Path snapRootDir = new Path("/level_0_A");
|
||||
final String flumeFileName = "flume.log";
|
||||
final String hbaseFileName = "hbase.log";
|
||||
final String snap1Name = "snap_1";
|
||||
final String snap2Name = "snap_2";
|
||||
final String snap3Name = "snap_3";
|
||||
|
||||
// Create files and open streams
|
||||
final Path flumeFile = new Path(snapRootDir, flumeFileName);
|
||||
createFile(flumeFile);
|
||||
final Path hbaseFile = new Path(snapRootDir, hbaseFileName);
|
||||
createFile(hbaseFile);
|
||||
FSDataOutputStream flumeOutputStream = fs.append(flumeFile);
|
||||
FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile);
|
||||
|
||||
// Create Snapshot S1
|
||||
final Path snap1Dir = SnapshotTestHelper.createSnapshot(
|
||||
fs, snapRootDir, snap1Name);
|
||||
final Path flumeS1Path = new Path(snap1Dir, flumeFileName);
|
||||
final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen();
|
||||
final Path hbaseS1Path = new Path(snap1Dir, hbaseFileName);
|
||||
final long hbaseFileLengthAfterS1 = fs.getFileStatus(hbaseFile).getLen();
|
||||
|
||||
// Verify if Snap S1 file length is same as the the current versions
|
||||
Assert.assertEquals(flumeFileLengthAfterS1,
|
||||
fs.getFileStatus(flumeS1Path).getLen());
|
||||
Assert.assertEquals(hbaseFileLengthAfterS1,
|
||||
fs.getFileStatus(hbaseS1Path).getLen());
|
||||
|
||||
long flumeFileWrittenDataLength = flumeFileLengthAfterS1;
|
||||
long hbaseFileWrittenDataLength = hbaseFileLengthAfterS1;
|
||||
int newWriteLength = (int) (BLOCKSIZE * 1.5);
|
||||
byte[] buf = new byte[newWriteLength];
|
||||
Random random = new Random();
|
||||
random.nextBytes(buf);
|
||||
|
||||
// Write more data to files
|
||||
flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
|
||||
hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
|
||||
|
||||
// Create Snapshot S2
|
||||
final Path snap2Dir = SnapshotTestHelper.createSnapshot(
|
||||
fs, snapRootDir, snap2Name);
|
||||
final Path flumeS2Path = new Path(snap2Dir, flumeFileName);
|
||||
final Path hbaseS2Path = new Path(snap2Dir, hbaseFileName);
|
||||
|
||||
// Verify current files length are same as all data written till now
|
||||
final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen();
|
||||
Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2);
|
||||
final long hbaseFileLengthAfterS2 = fs.getFileStatus(hbaseFile).getLen();
|
||||
Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS2);
|
||||
|
||||
// Verify if Snap S2 file length is same as the current versions
|
||||
Assert.assertEquals(flumeFileLengthAfterS2,
|
||||
fs.getFileStatus(flumeS2Path).getLen());
|
||||
Assert.assertEquals(hbaseFileLengthAfterS2,
|
||||
fs.getFileStatus(hbaseS2Path).getLen());
|
||||
|
||||
// Write more data to open files
|
||||
writeToStream(flumeOutputStream, buf);
|
||||
hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
|
||||
|
||||
// Verify old snapshots have point-in-time/frozen file
|
||||
// lengths even after the current versions have moved forward.
|
||||
Assert.assertEquals(flumeFileLengthAfterS1,
|
||||
fs.getFileStatus(flumeS1Path).getLen());
|
||||
Assert.assertEquals(flumeFileLengthAfterS2,
|
||||
fs.getFileStatus(flumeS2Path).getLen());
|
||||
Assert.assertEquals(hbaseFileLengthAfterS1,
|
||||
fs.getFileStatus(hbaseS1Path).getLen());
|
||||
Assert.assertEquals(hbaseFileLengthAfterS2,
|
||||
fs.getFileStatus(hbaseS2Path).getLen());
|
||||
|
||||
// Delete flume current file. Snapshots should
|
||||
// still have references to flume file.
|
||||
boolean flumeFileDeleted = fs.delete(flumeFile, true);
|
||||
Assert.assertTrue(flumeFileDeleted);
|
||||
Assert.assertFalse(fs.exists(flumeFile));
|
||||
Assert.assertTrue(fs.exists(flumeS1Path));
|
||||
Assert.assertTrue(fs.exists(flumeS2Path));
|
||||
|
||||
SnapshotTestHelper.createSnapshot(fs, snapRootDir, "tmp_snap");
|
||||
fs.deleteSnapshot(snapRootDir, "tmp_snap");
|
||||
|
||||
// Delete snap_2. snap_1 still has reference to
|
||||
// the flume file.
|
||||
fs.deleteSnapshot(snapRootDir, snap2Name);
|
||||
Assert.assertFalse(fs.exists(flumeS2Path));
|
||||
Assert.assertTrue(fs.exists(flumeS1Path));
|
||||
|
||||
// Delete snap_1. Now all traces of flume file
|
||||
// is gone.
|
||||
fs.deleteSnapshot(snapRootDir, snap1Name);
|
||||
Assert.assertFalse(fs.exists(flumeS2Path));
|
||||
Assert.assertFalse(fs.exists(flumeS1Path));
|
||||
|
||||
// Create Snapshot S3
|
||||
final Path snap3Dir = SnapshotTestHelper.createSnapshot(
|
||||
fs, snapRootDir, snap3Name);
|
||||
final Path hbaseS3Path = new Path(snap3Dir, hbaseFileName);
|
||||
|
||||
// Verify live files length is same as all data written till now
|
||||
final long hbaseFileLengthAfterS3 = fs.getFileStatus(hbaseFile).getLen();
|
||||
Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS3);
|
||||
|
||||
// Write more data to open files
|
||||
hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
|
||||
|
||||
// Verify old snapshots have point-in-time/frozen file
|
||||
// lengths even after the flume open file is deleted and
|
||||
// the hbase live file has moved forward.
|
||||
Assert.assertEquals(hbaseFileLengthAfterS3,
|
||||
fs.getFileStatus(hbaseS3Path).getLen());
|
||||
Assert.assertEquals(hbaseFileWrittenDataLength,
|
||||
fs.getFileStatus(hbaseFile).getLen());
|
||||
|
||||
hbaseOutputStream.close();
|
||||
}
|
||||
|
||||
private void restartNameNode() throws Exception {
|
||||
cluster.triggerBlockReports();
|
||||
NameNode nameNode = cluster.getNameNode();
|
||||
|
|
Loading…
Reference in New Issue