HDFS-12217. HDFS snapshots doesn't capture all open files when one of the open files is deleted.
(cherry picked from commit 52d7bafcf4
)
This commit is contained in:
parent
686bdc48b8
commit
a77ae7abdb
|
@ -30,4 +30,8 @@ public class SnapshotException extends IOException {
|
||||||
public SnapshotException(final Throwable cause) {
|
public SnapshotException(final Throwable cause) {
|
||||||
super(cause);
|
super(cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SnapshotException(final String message, final Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4779,7 +4779,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
return blockId;
|
return blockId;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isFileDeleted(INodeFile file) {
|
boolean isFileDeleted(INodeFile file) {
|
||||||
// Not in the inodeMap or in the snapshot but marked deleted.
|
// Not in the inodeMap or in the snapshot but marked deleted.
|
||||||
if (dir.getInode(file.getId()) == null) {
|
if (dir.getInode(file.getId()) == null) {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -213,18 +213,25 @@ public class LeaseManager {
|
||||||
*
|
*
|
||||||
* @return Set<INodesInPath>
|
* @return Set<INodesInPath>
|
||||||
*/
|
*/
|
||||||
public Set<INodesInPath> getINodeWithLeases() {
|
@VisibleForTesting
|
||||||
|
Set<INodesInPath> getINodeWithLeases() throws IOException {
|
||||||
return getINodeWithLeases(null);
|
return getINodeWithLeases(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized INode[] getINodesWithLease() {
|
private synchronized INode[] getINodesWithLease() {
|
||||||
int inodeCount = 0;
|
List<INode> inodes = new ArrayList<>(leasesById.size());
|
||||||
INode[] inodes = new INode[leasesById.size()];
|
INode currentINode;
|
||||||
for (long inodeId : leasesById.keySet()) {
|
for (long inodeId : leasesById.keySet()) {
|
||||||
inodes[inodeCount] = fsnamesystem.getFSDirectory().getInode(inodeId);
|
currentINode = fsnamesystem.getFSDirectory().getInode(inodeId);
|
||||||
inodeCount++;
|
// A file with an active lease could get deleted, or its
|
||||||
|
// parent directories could get recursively deleted.
|
||||||
|
if (currentINode != null &&
|
||||||
|
currentINode.isFile() &&
|
||||||
|
!fsnamesystem.isFileDeleted(currentINode.asFile())) {
|
||||||
|
inodes.add(currentINode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return inodes;
|
return inodes.toArray(new INode[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -237,7 +244,7 @@ public class LeaseManager {
|
||||||
* @return Set<INodesInPath>
|
* @return Set<INodesInPath>
|
||||||
*/
|
*/
|
||||||
public Set<INodesInPath> getINodeWithLeases(final INodeDirectory
|
public Set<INodesInPath> getINodeWithLeases(final INodeDirectory
|
||||||
ancestorDir) {
|
ancestorDir) throws IOException {
|
||||||
assert fsnamesystem.hasReadLock();
|
assert fsnamesystem.hasReadLock();
|
||||||
final long startTimeMs = Time.monotonicNow();
|
final long startTimeMs = Time.monotonicNow();
|
||||||
Set<INodesInPath> iipSet = new HashSet<>();
|
Set<INodesInPath> iipSet = new HashSet<>();
|
||||||
|
@ -284,7 +291,7 @@ public class LeaseManager {
|
||||||
try {
|
try {
|
||||||
iipSet.addAll(f.get());
|
iipSet.addAll(f.get());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("INode filter task encountered exception: ", e);
|
throw new IOException("Failed to get files with active leases", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final long endTimeMs = Time.monotonicNow();
|
final long endTimeMs = Time.monotonicNow();
|
||||||
|
|
|
@ -195,11 +195,17 @@ public class DirectorySnapshottableFeature extends DirectoryWithSnapshotFeature
|
||||||
s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
|
s.getRoot().setModificationTime(now, Snapshot.CURRENT_STATE_ID);
|
||||||
|
|
||||||
if (captureOpenFiles) {
|
if (captureOpenFiles) {
|
||||||
Set<INodesInPath> openFilesIIP =
|
try {
|
||||||
leaseManager.getINodeWithLeases(snapshotRoot);
|
Set<INodesInPath> openFilesIIP =
|
||||||
for (INodesInPath openFileIIP : openFilesIIP) {
|
leaseManager.getINodeWithLeases(snapshotRoot);
|
||||||
INodeFile openFile = openFileIIP.getLastINode().asFile();
|
for (INodesInPath openFileIIP : openFilesIIP) {
|
||||||
openFile.recordModification(openFileIIP.getLatestSnapshotId());
|
INodeFile openFile = openFileIIP.getLastINode().asFile();
|
||||||
|
openFile.recordModification(openFileIIP.getLatestSnapshotId());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new SnapshotException("Failed to add snapshot: Unable to " +
|
||||||
|
"capture all open files under the snapshot dir " +
|
||||||
|
snapshotRoot.getFullPathName() + " for snapshot '" + name + "'", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -277,7 +278,7 @@ public class TestLeaseManager {
|
||||||
|
|
||||||
private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager,
|
private void testInodeWithLeasesAtScaleImpl(final LeaseManager leaseManager,
|
||||||
final FSDirectory fsDirectory, INodeDirectory ancestorDirectory,
|
final FSDirectory fsDirectory, INodeDirectory ancestorDirectory,
|
||||||
int scale) {
|
int scale) throws IOException {
|
||||||
verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
|
verifyINodeLeaseCounts(leaseManager, ancestorDirectory, 0, 0, 0);
|
||||||
|
|
||||||
Set<Long> iNodeIds = new HashSet<>();
|
Set<Long> iNodeIds = new HashSet<>();
|
||||||
|
@ -388,7 +389,8 @@ public class TestLeaseManager {
|
||||||
|
|
||||||
private void verifyINodeLeaseCounts(final LeaseManager leaseManager,
|
private void verifyINodeLeaseCounts(final LeaseManager leaseManager,
|
||||||
INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount,
|
INodeDirectory ancestorDirectory, int iNodeIdWithLeaseCount,
|
||||||
int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount) {
|
int iNodeWithLeaseCount, int iNodeUnderAncestorLeaseCount)
|
||||||
|
throws IOException {
|
||||||
assertEquals(iNodeIdWithLeaseCount,
|
assertEquals(iNodeIdWithLeaseCount,
|
||||||
leaseManager.getINodeIdWithLeases().size());
|
leaseManager.getINodeIdWithLeases().size());
|
||||||
assertEquals(iNodeWithLeaseCount,
|
assertEquals(iNodeWithLeaseCount,
|
||||||
|
|
|
@ -496,6 +496,132 @@ public class TestOpenFilesWithSnapshot {
|
||||||
flumeOutputStream.close();
|
flumeOutputStream.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test snapshot capturing open files when an open file with active lease
|
||||||
|
* is deleted by the client.
|
||||||
|
*/
|
||||||
|
@Test (timeout = 120000)
|
||||||
|
public void testSnapshotsForOpenFilesAndDeletion() throws Exception {
|
||||||
|
// Construct the directory tree
|
||||||
|
final Path snapRootDir = new Path("/level_0_A");
|
||||||
|
final String flumeFileName = "flume.log";
|
||||||
|
final String hbaseFileName = "hbase.log";
|
||||||
|
final String snap1Name = "snap_1";
|
||||||
|
final String snap2Name = "snap_2";
|
||||||
|
final String snap3Name = "snap_3";
|
||||||
|
|
||||||
|
// Create files and open streams
|
||||||
|
final Path flumeFile = new Path(snapRootDir, flumeFileName);
|
||||||
|
createFile(flumeFile);
|
||||||
|
final Path hbaseFile = new Path(snapRootDir, hbaseFileName);
|
||||||
|
createFile(hbaseFile);
|
||||||
|
FSDataOutputStream flumeOutputStream = fs.append(flumeFile);
|
||||||
|
FSDataOutputStream hbaseOutputStream = fs.append(hbaseFile);
|
||||||
|
|
||||||
|
// Create Snapshot S1
|
||||||
|
final Path snap1Dir = SnapshotTestHelper.createSnapshot(
|
||||||
|
fs, snapRootDir, snap1Name);
|
||||||
|
final Path flumeS1Path = new Path(snap1Dir, flumeFileName);
|
||||||
|
final long flumeFileLengthAfterS1 = fs.getFileStatus(flumeFile).getLen();
|
||||||
|
final Path hbaseS1Path = new Path(snap1Dir, hbaseFileName);
|
||||||
|
final long hbaseFileLengthAfterS1 = fs.getFileStatus(hbaseFile).getLen();
|
||||||
|
|
||||||
|
// Verify if Snap S1 file length is same as the the current versions
|
||||||
|
Assert.assertEquals(flumeFileLengthAfterS1,
|
||||||
|
fs.getFileStatus(flumeS1Path).getLen());
|
||||||
|
Assert.assertEquals(hbaseFileLengthAfterS1,
|
||||||
|
fs.getFileStatus(hbaseS1Path).getLen());
|
||||||
|
|
||||||
|
long flumeFileWrittenDataLength = flumeFileLengthAfterS1;
|
||||||
|
long hbaseFileWrittenDataLength = hbaseFileLengthAfterS1;
|
||||||
|
int newWriteLength = (int) (BLOCKSIZE * 1.5);
|
||||||
|
byte[] buf = new byte[newWriteLength];
|
||||||
|
Random random = new Random();
|
||||||
|
random.nextBytes(buf);
|
||||||
|
|
||||||
|
// Write more data to files
|
||||||
|
flumeFileWrittenDataLength += writeToStream(flumeOutputStream, buf);
|
||||||
|
hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
|
||||||
|
|
||||||
|
// Create Snapshot S2
|
||||||
|
final Path snap2Dir = SnapshotTestHelper.createSnapshot(
|
||||||
|
fs, snapRootDir, snap2Name);
|
||||||
|
final Path flumeS2Path = new Path(snap2Dir, flumeFileName);
|
||||||
|
final Path hbaseS2Path = new Path(snap2Dir, hbaseFileName);
|
||||||
|
|
||||||
|
// Verify current files length are same as all data written till now
|
||||||
|
final long flumeFileLengthAfterS2 = fs.getFileStatus(flumeFile).getLen();
|
||||||
|
Assert.assertEquals(flumeFileWrittenDataLength, flumeFileLengthAfterS2);
|
||||||
|
final long hbaseFileLengthAfterS2 = fs.getFileStatus(hbaseFile).getLen();
|
||||||
|
Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS2);
|
||||||
|
|
||||||
|
// Verify if Snap S2 file length is same as the current versions
|
||||||
|
Assert.assertEquals(flumeFileLengthAfterS2,
|
||||||
|
fs.getFileStatus(flumeS2Path).getLen());
|
||||||
|
Assert.assertEquals(hbaseFileLengthAfterS2,
|
||||||
|
fs.getFileStatus(hbaseS2Path).getLen());
|
||||||
|
|
||||||
|
// Write more data to open files
|
||||||
|
writeToStream(flumeOutputStream, buf);
|
||||||
|
hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
|
||||||
|
|
||||||
|
// Verify old snapshots have point-in-time/frozen file
|
||||||
|
// lengths even after the current versions have moved forward.
|
||||||
|
Assert.assertEquals(flumeFileLengthAfterS1,
|
||||||
|
fs.getFileStatus(flumeS1Path).getLen());
|
||||||
|
Assert.assertEquals(flumeFileLengthAfterS2,
|
||||||
|
fs.getFileStatus(flumeS2Path).getLen());
|
||||||
|
Assert.assertEquals(hbaseFileLengthAfterS1,
|
||||||
|
fs.getFileStatus(hbaseS1Path).getLen());
|
||||||
|
Assert.assertEquals(hbaseFileLengthAfterS2,
|
||||||
|
fs.getFileStatus(hbaseS2Path).getLen());
|
||||||
|
|
||||||
|
// Delete flume current file. Snapshots should
|
||||||
|
// still have references to flume file.
|
||||||
|
boolean flumeFileDeleted = fs.delete(flumeFile, true);
|
||||||
|
Assert.assertTrue(flumeFileDeleted);
|
||||||
|
Assert.assertFalse(fs.exists(flumeFile));
|
||||||
|
Assert.assertTrue(fs.exists(flumeS1Path));
|
||||||
|
Assert.assertTrue(fs.exists(flumeS2Path));
|
||||||
|
|
||||||
|
SnapshotTestHelper.createSnapshot(fs, snapRootDir, "tmp_snap");
|
||||||
|
fs.deleteSnapshot(snapRootDir, "tmp_snap");
|
||||||
|
|
||||||
|
// Delete snap_2. snap_1 still has reference to
|
||||||
|
// the flume file.
|
||||||
|
fs.deleteSnapshot(snapRootDir, snap2Name);
|
||||||
|
Assert.assertFalse(fs.exists(flumeS2Path));
|
||||||
|
Assert.assertTrue(fs.exists(flumeS1Path));
|
||||||
|
|
||||||
|
// Delete snap_1. Now all traces of flume file
|
||||||
|
// is gone.
|
||||||
|
fs.deleteSnapshot(snapRootDir, snap1Name);
|
||||||
|
Assert.assertFalse(fs.exists(flumeS2Path));
|
||||||
|
Assert.assertFalse(fs.exists(flumeS1Path));
|
||||||
|
|
||||||
|
// Create Snapshot S3
|
||||||
|
final Path snap3Dir = SnapshotTestHelper.createSnapshot(
|
||||||
|
fs, snapRootDir, snap3Name);
|
||||||
|
final Path hbaseS3Path = new Path(snap3Dir, hbaseFileName);
|
||||||
|
|
||||||
|
// Verify live files length is same as all data written till now
|
||||||
|
final long hbaseFileLengthAfterS3 = fs.getFileStatus(hbaseFile).getLen();
|
||||||
|
Assert.assertEquals(hbaseFileWrittenDataLength, hbaseFileLengthAfterS3);
|
||||||
|
|
||||||
|
// Write more data to open files
|
||||||
|
hbaseFileWrittenDataLength += writeToStream(hbaseOutputStream, buf);
|
||||||
|
|
||||||
|
// Verify old snapshots have point-in-time/frozen file
|
||||||
|
// lengths even after the flume open file is deleted and
|
||||||
|
// the hbase live file has moved forward.
|
||||||
|
Assert.assertEquals(hbaseFileLengthAfterS3,
|
||||||
|
fs.getFileStatus(hbaseS3Path).getLen());
|
||||||
|
Assert.assertEquals(hbaseFileWrittenDataLength,
|
||||||
|
fs.getFileStatus(hbaseFile).getLen());
|
||||||
|
|
||||||
|
hbaseOutputStream.close();
|
||||||
|
}
|
||||||
|
|
||||||
private void restartNameNode() throws Exception {
|
private void restartNameNode() throws Exception {
|
||||||
cluster.triggerBlockReports();
|
cluster.triggerBlockReports();
|
||||||
NameNode nameNode = cluster.getNameNode();
|
NameNode nameNode = cluster.getNameNode();
|
||||||
|
|
Loading…
Reference in New Issue